
Enable the ability to activate 'Traffic Mode' by holding down the 'distance' button for 2.5 seconds. When 'Traffic Mode' is active the onroad UI will turn red and openpilot will drive catered towards stop and go traffic.
718 lines
27 KiB
Python
718 lines
27 KiB
Python
import json
|
||
import os
|
||
import numpy as np
|
||
import tomllib
|
||
from abc import abstractmethod, ABC
|
||
from difflib import SequenceMatcher
|
||
from enum import StrEnum
|
||
from json import load
|
||
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Union
|
||
from collections.abc import Callable
|
||
|
||
from cereal import car
|
||
from openpilot.common.basedir import BASEDIR
|
||
from openpilot.common.conversions import Conversions as CV
|
||
from openpilot.common.simple_kalman import KF1D, get_kalman_gain
|
||
from openpilot.common.numpy_fast import clip
|
||
from openpilot.common.params import Params
|
||
from openpilot.common.realtime import DT_CTRL
|
||
from openpilot.selfdrive.car import apply_hysteresis, gen_empty_fingerprint, scale_rot_inertia, scale_tire_stiffness, STD_CARGO_KG
|
||
from openpilot.selfdrive.car.values import PLATFORMS
|
||
from openpilot.selfdrive.controls.lib.drive_helpers import CRUISE_LONG_PRESS, V_CRUISE_MAX, get_friction
|
||
from openpilot.selfdrive.controls.lib.events import Events
|
||
from openpilot.selfdrive.controls.lib.vehicle_model import VehicleModel
|
||
|
||
ButtonType = car.CarState.ButtonEvent.Type
|
||
GearShifter = car.CarState.GearShifter
|
||
EventName = car.CarEvent.EventName
|
||
|
||
MAX_CTRL_SPEED = (V_CRUISE_MAX + 4) * CV.KPH_TO_MS
|
||
ACCEL_MAX = 2.0
|
||
ACCEL_MAX_PLUS = 4.0
|
||
ACCEL_MIN = -3.5
|
||
FRICTION_THRESHOLD = 0.3
|
||
|
||
NEURAL_PARAMS_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/neural_ff_weights.json')
|
||
TORQUE_PARAMS_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/params.toml')
|
||
TORQUE_OVERRIDE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/override.toml')
|
||
TORQUE_SUBSTITUTE_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/substitute.toml')
|
||
TORQUE_NN_MODEL_PATH = os.path.join(BASEDIR, 'selfdrive/car/torque_data/lat_models')
|
||
|
||
def similarity(s1:str, s2:str) -> float:
|
||
return SequenceMatcher(None, s1, s2).ratio()
|
||
|
||
class LatControlInputs(NamedTuple):
|
||
lateral_acceleration: float
|
||
roll_compensation: float
|
||
vego: float
|
||
aego: float
|
||
|
||
|
||
TorqueFromLateralAccelCallbackType = Callable[[LatControlInputs, car.CarParams.LateralTorqueTuning, float, float, bool, bool], float]
|
||
|
||
|
||
def get_torque_params(candidate):
|
||
with open(TORQUE_SUBSTITUTE_PATH, 'rb') as f:
|
||
sub = tomllib.load(f)
|
||
if candidate in sub:
|
||
candidate = sub[candidate]
|
||
|
||
with open(TORQUE_PARAMS_PATH, 'rb') as f:
|
||
params = tomllib.load(f)
|
||
with open(TORQUE_OVERRIDE_PATH, 'rb') as f:
|
||
override = tomllib.load(f)
|
||
|
||
# Ensure no overlap
|
||
if sum([candidate in x for x in [sub, params, override]]) > 1:
|
||
raise RuntimeError(f'{candidate} is defined twice in torque config')
|
||
|
||
if candidate in override:
|
||
out = override[candidate]
|
||
elif candidate in params:
|
||
out = params[candidate]
|
||
else:
|
||
raise NotImplementedError(f"Did not find torque params for {candidate}")
|
||
return {key: out[i] for i, key in enumerate(params['legend'])}
|
||
|
||
|
||
# Twilsonco's Lateral Neural Network Feedforward
|
||
class FluxModel:
|
||
# dict used to rename activation functions whose names aren't valid python identifiers
|
||
activation_function_names = {'σ': 'sigmoid'}
|
||
def __init__(self, params_file, zero_bias=False):
|
||
with open(params_file, "r") as f:
|
||
params = load(f)
|
||
|
||
self.input_size = params["input_size"]
|
||
self.output_size = params["output_size"]
|
||
self.input_mean = np.array(params["input_mean"], dtype=np.float32).T
|
||
self.input_std = np.array(params["input_std"], dtype=np.float32).T
|
||
self.layers = []
|
||
|
||
for layer_params in params["layers"]:
|
||
W = np.array(layer_params[next(key for key in layer_params.keys() if key.endswith('_W'))], dtype=np.float32).T
|
||
b = np.array(layer_params[next(key for key in layer_params.keys() if key.endswith('_b'))], dtype=np.float32).T
|
||
if zero_bias:
|
||
b = np.zeros_like(b)
|
||
activation = layer_params["activation"]
|
||
for k, v in self.activation_function_names.items():
|
||
activation = activation.replace(k, v)
|
||
self.layers.append((W, b, activation))
|
||
|
||
self.validate_layers()
|
||
self.check_for_friction_override()
|
||
|
||
# Begin activation functions.
|
||
# These are called by name using the keys in the model json file
|
||
def sigmoid(self, x):
|
||
return 1 / (1 + np.exp(-x))
|
||
|
||
def identity(self, x):
|
||
return x
|
||
# End activation functions
|
||
|
||
def forward(self, x):
|
||
for W, b, activation in self.layers:
|
||
x = getattr(self, activation)(x.dot(W) + b)
|
||
return x
|
||
|
||
def evaluate(self, input_array):
|
||
in_len = len(input_array)
|
||
if in_len != self.input_size:
|
||
# If the input is length 2-4, then it's a simplified evaluation.
|
||
# In that case, need to add on zeros to fill out the input array to match the correct length.
|
||
if 2 <= in_len:
|
||
input_array = input_array + [0] * (self.input_size - in_len)
|
||
else:
|
||
raise ValueError(f"Input array length {len(input_array)} must be length 2 or greater")
|
||
|
||
input_array = np.array(input_array, dtype=np.float32)
|
||
|
||
# Rescale the input array using the input_mean and input_std
|
||
input_array = (input_array - self.input_mean) / self.input_std
|
||
|
||
output_array = self.forward(input_array)
|
||
|
||
return float(output_array[0, 0])
|
||
|
||
def validate_layers(self):
|
||
for W, b, activation in self.layers:
|
||
if not hasattr(self, activation):
|
||
raise ValueError(f"Unknown activation: {activation}")
|
||
|
||
def check_for_friction_override(self):
|
||
y = self.evaluate([10.0, 0.0, 0.2])
|
||
self.friction_override = (y < 0.1)
|
||
|
||
def get_nn_model_path(car, eps_firmware) -> Tuple[Union[str, None, float]]:
|
||
def check_nn_path(check_model):
|
||
model_path = None
|
||
max_similarity = -1.0
|
||
for f in os.listdir(TORQUE_NN_MODEL_PATH):
|
||
if f.endswith(".json") and car in f:
|
||
model = f.replace(".json", "").replace(f"{TORQUE_NN_MODEL_PATH}/", "")
|
||
similarity_score = similarity(model, check_model)
|
||
if similarity_score > max_similarity:
|
||
max_similarity = similarity_score
|
||
model_path = os.path.join(TORQUE_NN_MODEL_PATH, f)
|
||
return model_path, max_similarity
|
||
|
||
if len(eps_firmware) > 3:
|
||
eps_firmware = eps_firmware.replace("\\", "")
|
||
check_model = f"{car} {eps_firmware}"
|
||
else:
|
||
check_model = car
|
||
model_path, max_similarity = check_nn_path(check_model)
|
||
if max_similarity < 0.9:
|
||
check_model = car
|
||
model_path, max_similarity = check_nn_path(check_model)
|
||
if max_similarity < 0.9:
|
||
model_path = None
|
||
return model_path, max_similarity
|
||
|
||
def get_nn_model(car, eps_firmware) -> Tuple[Union[FluxModel, None, float]]:
|
||
model, similarity_score = get_nn_model_path(car, eps_firmware)
|
||
if model is not None:
|
||
model = FluxModel(model)
|
||
return model, similarity_score
|
||
|
||
# generic car and radar interfaces
|
||
|
||
class CarInterfaceBase(ABC):
|
||
def __init__(self, CP, CarController, CarState):
|
||
self.CP = CP
|
||
self.VM = VehicleModel(CP)
|
||
eps_firmware = str(next((fw.fwVersion for fw in CP.carFw if fw.ecu == "eps"), ""))
|
||
|
||
self.frame = 0
|
||
self.steering_unpressed = 0
|
||
self.low_speed_alert = False
|
||
self.no_steer_warning = False
|
||
self.silent_steer_warning = True
|
||
self.v_ego_cluster_seen = False
|
||
|
||
self.CS = None
|
||
self.can_parsers = []
|
||
if CarState is not None:
|
||
self.CS = CarState(CP)
|
||
|
||
self.cp = self.CS.get_can_parser(CP)
|
||
self.cp_cam = self.CS.get_cam_can_parser(CP)
|
||
self.cp_adas = self.CS.get_adas_can_parser(CP)
|
||
self.cp_body = self.CS.get_body_can_parser(CP)
|
||
self.cp_loopback = self.CS.get_loopback_can_parser(CP)
|
||
self.can_parsers = [self.cp, self.cp_cam, self.cp_adas, self.cp_body, self.cp_loopback]
|
||
|
||
self.CC = None
|
||
if CarController is not None:
|
||
self.CC = CarController(self.cp.dbc_name, CP, self.VM)
|
||
|
||
# FrogPilot variables
|
||
self.params = Params()
|
||
self.params_memory = Params("/dev/shm/params")
|
||
|
||
lateral_tune = self.params.get_bool("LateralTune")
|
||
nnff_supported = self.initialize_lat_torque_nn(CP.carFingerprint, eps_firmware)
|
||
use_comma_nnff = self.check_comma_nn_ff_support(CP.carFingerprint)
|
||
self.use_nnff = not use_comma_nnff and nnff_supported and lateral_tune and self.params.get_bool("NNFF")
|
||
self.use_nnff_lite = not use_comma_nnff and not self.use_nnff and lateral_tune and self.params.get_bool("NNFFLite")
|
||
|
||
self.belowSteerSpeed_shown = False
|
||
self.disable_belowSteerSpeed = False
|
||
self.disable_resumeRequired = False
|
||
self.prev_distance_button = False
|
||
self.resumeRequired_shown = False
|
||
self.traffic_mode_changed = False
|
||
|
||
self.gap_counter = 0
|
||
|
||
def get_ff_nn(self, x):
|
||
return self.lat_torque_nn_model.evaluate(x)
|
||
|
||
def check_comma_nn_ff_support(self, car):
|
||
with open(NEURAL_PARAMS_PATH, 'r') as file:
|
||
data = json.load(file)
|
||
return car in data
|
||
|
||
|
||
def initialize_lat_torque_nn(self, car, eps_firmware):
|
||
self.lat_torque_nn_model, _ = get_nn_model(car, eps_firmware)
|
||
return (self.lat_torque_nn_model is not None)
|
||
|
||
@staticmethod
|
||
def get_pid_accel_limits(CP, current_speed, cruise_speed, frogpilot_variables):
|
||
if frogpilot_variables.sport_plus:
|
||
return ACCEL_MIN, ACCEL_MAX_PLUS
|
||
else:
|
||
return ACCEL_MIN, ACCEL_MAX
|
||
|
||
@classmethod
|
||
def get_non_essential_params(cls, candidate: str):
|
||
"""
|
||
Parameters essential to controlling the car may be incomplete or wrong without FW versions or fingerprints.
|
||
"""
|
||
return cls.get_params(candidate, gen_empty_fingerprint(), list(), False, False)
|
||
|
||
@classmethod
|
||
def get_params(cls, params, candidate: str, fingerprint: dict[int, dict[int, int]], car_fw: list[car.CarParams.CarFw], experimental_long: bool, docs: bool):
|
||
ret = CarInterfaceBase.get_std_params(candidate)
|
||
|
||
platform = PLATFORMS[candidate]
|
||
ret.mass = platform.config.specs.mass
|
||
ret.wheelbase = platform.config.specs.wheelbase
|
||
ret.steerRatio = platform.config.specs.steerRatio
|
||
ret.centerToFront = ret.wheelbase * platform.config.specs.centerToFrontRatio
|
||
ret.minEnableSpeed = platform.config.specs.minEnableSpeed
|
||
ret.minSteerSpeed = platform.config.specs.minSteerSpeed
|
||
ret.tireStiffnessFactor = platform.config.specs.tireStiffnessFactor
|
||
ret.flags |= int(platform.config.flags)
|
||
|
||
ret = cls._get_params(ret, params, candidate, fingerprint, car_fw, experimental_long, docs)
|
||
|
||
# Enable torque controller for all cars that do not use angle based steering
|
||
if ret.steerControlType != car.CarParams.SteerControlType.angle and params.get_bool("LateralTune") and params.get_bool("NNFF"):
|
||
CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning)
|
||
eps_firmware = str(next((fw.fwVersion for fw in car_fw if fw.ecu == "eps"), ""))
|
||
model, similarity_score = get_nn_model_path(candidate, eps_firmware)
|
||
if model is not None:
|
||
params.put("NNFFModelName", candidate)
|
||
|
||
# Vehicle mass is published curb weight plus assumed payload such as a human driver; notCars have no assumed payload
|
||
if not ret.notCar:
|
||
ret.mass = ret.mass + STD_CARGO_KG
|
||
|
||
# Set params dependent on values set by the car interface
|
||
ret.rotationalInertia = scale_rot_inertia(ret.mass, ret.wheelbase)
|
||
ret.tireStiffnessFront, ret.tireStiffnessRear = scale_tire_stiffness(ret.mass, ret.wheelbase, ret.centerToFront, ret.tireStiffnessFactor)
|
||
|
||
return ret
|
||
|
||
@staticmethod
|
||
@abstractmethod
|
||
def _get_params(ret: car.CarParams, candidate, fingerprint: dict[int, dict[int, int]],
|
||
car_fw: list[car.CarParams.CarFw], experimental_long: bool, docs: bool):
|
||
raise NotImplementedError
|
||
|
||
@staticmethod
|
||
def init(CP, logcan, sendcan):
|
||
pass
|
||
|
||
@staticmethod
|
||
def get_steer_feedforward_default(desired_angle, v_ego):
|
||
# Proportional to realigning tire momentum: lateral acceleration.
|
||
return desired_angle * (v_ego**2)
|
||
|
||
def get_steer_feedforward_function(self):
|
||
return self.get_steer_feedforward_default
|
||
|
||
def torque_from_lateral_accel_linear(self, latcontrol_inputs: LatControlInputs, torque_params: car.CarParams.LateralTorqueTuning,
|
||
lateral_accel_error: float, lateral_accel_deadzone: float, friction_compensation: bool, gravity_adjusted: bool) -> float:
|
||
# The default is a linear relationship between torque and lateral acceleration (accounting for road roll and steering friction)
|
||
friction = get_friction(lateral_accel_error, lateral_accel_deadzone, FRICTION_THRESHOLD, torque_params, friction_compensation)
|
||
return (latcontrol_inputs.lateral_acceleration / float(torque_params.latAccelFactor)) + friction
|
||
|
||
def torque_from_lateral_accel(self) -> TorqueFromLateralAccelCallbackType:
|
||
return self.torque_from_lateral_accel_linear
|
||
|
||
# returns a set of default params to avoid repetition in car specific params
|
||
@staticmethod
|
||
def get_std_params(candidate):
|
||
ret = car.CarParams.new_message()
|
||
ret.carFingerprint = candidate
|
||
|
||
# Car docs fields
|
||
ret.maxLateralAccel = get_torque_params(candidate)['MAX_LAT_ACCEL_MEASURED']
|
||
ret.autoResumeSng = True # describes whether car can resume from a stop automatically
|
||
|
||
# standard ALC params
|
||
ret.tireStiffnessFactor = 1.0
|
||
ret.steerControlType = car.CarParams.SteerControlType.torque
|
||
ret.minSteerSpeed = 0.
|
||
ret.wheelSpeedFactor = 1.0
|
||
|
||
ret.pcmCruise = True # openpilot's state is tied to the PCM's cruise state on most cars
|
||
ret.minEnableSpeed = -1. # enable is done by stock ACC, so ignore this
|
||
ret.steerRatioRear = 0. # no rear steering, at least on the listed cars aboveA
|
||
ret.openpilotLongitudinalControl = False
|
||
ret.stopAccel = -2.0
|
||
ret.stoppingDecelRate = 0.8 # brake_travel/s while trying to stop
|
||
ret.vEgoStopping = 0.5
|
||
ret.vEgoStarting = 0.5
|
||
ret.stoppingControl = True
|
||
ret.longitudinalTuning.deadzoneBP = [0.]
|
||
ret.longitudinalTuning.deadzoneV = [0.]
|
||
ret.longitudinalTuning.kf = 1.
|
||
ret.longitudinalTuning.kpBP = [0.]
|
||
ret.longitudinalTuning.kpV = [1.]
|
||
ret.longitudinalTuning.kiBP = [0.]
|
||
ret.longitudinalTuning.kiV = [1.]
|
||
# TODO estimate car specific lag, use .15s for now
|
||
ret.longitudinalActuatorDelayLowerBound = 0.15
|
||
ret.longitudinalActuatorDelayUpperBound = 0.15
|
||
ret.steerLimitTimer = 1.0
|
||
return ret
|
||
|
||
@staticmethod
|
||
def configure_torque_tune(candidate, tune, steering_angle_deadzone_deg=0.0, use_steering_angle=True):
|
||
params = get_torque_params(candidate)
|
||
|
||
tune.init('torque')
|
||
tune.torque.useSteeringAngle = use_steering_angle
|
||
tune.torque.kp = 1.0
|
||
tune.torque.kf = 1.0
|
||
tune.torque.ki = 0.1
|
||
tune.torque.friction = params['FRICTION']
|
||
tune.torque.latAccelFactor = params['LAT_ACCEL_FACTOR']
|
||
tune.torque.latAccelOffset = 0.0
|
||
tune.torque.steeringAngleDeadzoneDeg = steering_angle_deadzone_deg
|
||
|
||
@abstractmethod
|
||
def _update(self, c: car.CarControl) -> car.CarState:
|
||
pass
|
||
|
||
def update(self, c: car.CarControl, can_strings: list[bytes], frogpilot_variables) -> car.CarState:
|
||
# parse can
|
||
for cp in self.can_parsers:
|
||
if cp is not None:
|
||
cp.update_strings(can_strings)
|
||
|
||
# get CarState
|
||
ret, fp_ret = self._update(c, frogpilot_variables)
|
||
|
||
ret.canValid = all(cp.can_valid for cp in self.can_parsers if cp is not None)
|
||
ret.canTimeout = any(cp.bus_timeout for cp in self.can_parsers if cp is not None)
|
||
|
||
if ret.vEgoCluster == 0.0 and not self.v_ego_cluster_seen:
|
||
ret.vEgoCluster = ret.vEgo
|
||
else:
|
||
self.v_ego_cluster_seen = True
|
||
|
||
# Many cars apply hysteresis to the ego dash speed
|
||
if self.CS is not None:
|
||
ret.vEgoCluster = apply_hysteresis(ret.vEgoCluster, self.CS.out.vEgoCluster, self.CS.cluster_speed_hyst_gap)
|
||
if abs(ret.vEgo) < self.CS.cluster_min_speed:
|
||
ret.vEgoCluster = 0.0
|
||
|
||
if ret.cruiseState.speedCluster == 0:
|
||
ret.cruiseState.speedCluster = ret.cruiseState.speed
|
||
|
||
if self.CP.carName != "mock":
|
||
distance_button = self.CS.distance_button or self.params_memory.get_bool("OnroadDistanceButtonPressed")
|
||
fp_ret.distanceLongPressed = self.frogpilot_distance_functions(distance_button, self.prev_distance_button, frogpilot_variables)
|
||
self.prev_distance_button = distance_button
|
||
|
||
# copy back for next iteration
|
||
reader = ret.as_reader()
|
||
frogpilot_reader = fp_ret.as_reader()
|
||
if self.CS is not None:
|
||
self.CS.out = reader
|
||
|
||
return reader, frogpilot_reader
|
||
|
||
@abstractmethod
|
||
def apply(self, c: car.CarControl, now_nanos: int) -> tuple[car.CarControl.Actuators, list[bytes]]:
|
||
pass
|
||
|
||
def create_common_events(self, cs_out, extra_gears=None, pcm_enable=True, allow_enable=True,
|
||
enable_buttons=(ButtonType.accelCruise, ButtonType.decelCruise)):
|
||
events = Events()
|
||
|
||
if cs_out.doorOpen:
|
||
events.add(EventName.doorOpen)
|
||
if cs_out.seatbeltUnlatched:
|
||
events.add(EventName.seatbeltNotLatched)
|
||
if cs_out.gearShifter != GearShifter.drive and (extra_gears is None or
|
||
cs_out.gearShifter not in extra_gears):
|
||
events.add(EventName.wrongGear)
|
||
if cs_out.gearShifter == GearShifter.reverse:
|
||
events.add(EventName.reverseGear)
|
||
if not cs_out.cruiseState.available:
|
||
events.add(EventName.wrongCarMode)
|
||
if cs_out.espDisabled:
|
||
events.add(EventName.espDisabled)
|
||
if cs_out.stockFcw:
|
||
events.add(EventName.stockFcw)
|
||
if cs_out.stockAeb:
|
||
events.add(EventName.stockAeb)
|
||
if cs_out.vEgo > MAX_CTRL_SPEED:
|
||
events.add(EventName.speedTooHigh)
|
||
if cs_out.cruiseState.nonAdaptive:
|
||
events.add(EventName.wrongCruiseMode)
|
||
if cs_out.brakeHoldActive and self.CP.openpilotLongitudinalControl:
|
||
events.add(EventName.brakeHold)
|
||
if cs_out.parkingBrake:
|
||
events.add(EventName.parkBrake)
|
||
if cs_out.accFaulted:
|
||
events.add(EventName.accFaulted)
|
||
if cs_out.steeringPressed:
|
||
events.add(EventName.steerOverride)
|
||
|
||
# Handle button presses
|
||
for b in cs_out.buttonEvents:
|
||
# Enable OP long on falling edge of enable buttons (defaults to accelCruise and decelCruise, overridable per-port)
|
||
if not self.CP.pcmCruise and (b.type in enable_buttons and not b.pressed):
|
||
events.add(EventName.buttonEnable)
|
||
# Disable on rising and falling edge of cancel for both stock and OP long
|
||
if b.type == ButtonType.cancel:
|
||
events.add(EventName.buttonCancel)
|
||
|
||
# Handle permanent and temporary steering faults
|
||
self.steering_unpressed = 0 if cs_out.steeringPressed else self.steering_unpressed + 1
|
||
if cs_out.steerFaultTemporary:
|
||
if cs_out.steeringPressed and (not self.CS.out.steerFaultTemporary or self.no_steer_warning):
|
||
self.no_steer_warning = True
|
||
else:
|
||
self.no_steer_warning = False
|
||
|
||
# if the user overrode recently, show a less harsh alert
|
||
if self.silent_steer_warning or cs_out.standstill or self.steering_unpressed < int(1.5 / DT_CTRL):
|
||
self.silent_steer_warning = True
|
||
events.add(EventName.steerTempUnavailableSilent)
|
||
else:
|
||
events.add(EventName.steerTempUnavailable)
|
||
else:
|
||
self.no_steer_warning = False
|
||
self.silent_steer_warning = False
|
||
if cs_out.steerFaultPermanent:
|
||
events.add(EventName.steerUnavailable)
|
||
|
||
# we engage when pcm is active (rising edge)
|
||
# enabling can optionally be blocked by the car interface
|
||
if pcm_enable:
|
||
if cs_out.cruiseState.enabled and not self.CS.out.cruiseState.enabled and allow_enable:
|
||
events.add(EventName.pcmEnable)
|
||
elif not cs_out.cruiseState.enabled:
|
||
events.add(EventName.pcmDisable)
|
||
|
||
return events
|
||
|
||
def frogpilot_distance_functions(self, distance_button, prev_distance_button, frogpilot_variables):
|
||
if distance_button:
|
||
self.gap_counter += 1
|
||
elif not prev_distance_button:
|
||
self.gap_counter = 0
|
||
|
||
if self.gap_counter == CRUISE_LONG_PRESS * 1.5 and frogpilot_variables.experimental_mode_via_distance or self.traffic_mode_changed:
|
||
if frogpilot_variables.conditional_experimental_mode:
|
||
conditional_status = self.params_memory.get_int("CEStatus")
|
||
override_value = 0 if conditional_status in {1, 2, 3, 4, 5, 6} else 1 if conditional_status >= 7 else 2
|
||
self.params_memory.put_int("CEStatus", override_value)
|
||
else:
|
||
experimental_mode = self.params.get_bool("ExperimentalMode")
|
||
self.params.put_bool("ExperimentalMode", not experimental_mode)
|
||
self.traffic_mode_changed = False
|
||
|
||
if self.gap_counter == CRUISE_LONG_PRESS * 5 and frogpilot_variables.traffic_mode:
|
||
traffic_mode = self.params_memory.get_bool("TrafficModeActive")
|
||
self.params_memory.put_bool("TrafficModeActive", not traffic_mode)
|
||
self.traffic_mode_changed = frogpilot_variables.experimental_mode_via_distance
|
||
|
||
return self.gap_counter >= CRUISE_LONG_PRESS
|
||
|
||
class RadarInterfaceBase(ABC):
|
||
def __init__(self, CP):
|
||
self.rcp = None
|
||
self.pts = {}
|
||
self.delay = 0
|
||
self.radar_ts = CP.radarTimeStep
|
||
self.frame = 0
|
||
|
||
def update(self, can_strings):
|
||
self.frame += 1
|
||
if (self.frame % int(100 * self.radar_ts)) == 0:
|
||
return car.RadarData.new_message()
|
||
return None
|
||
|
||
|
||
class CarStateBase(ABC):
|
||
def __init__(self, CP):
|
||
self.CP = CP
|
||
self.car_fingerprint = CP.carFingerprint
|
||
self.out = car.CarState.new_message()
|
||
|
||
self.cruise_buttons = 0
|
||
self.left_blinker_cnt = 0
|
||
self.right_blinker_cnt = 0
|
||
self.steering_pressed_cnt = 0
|
||
self.left_blinker_prev = False
|
||
self.right_blinker_prev = False
|
||
self.cluster_speed_hyst_gap = 0.0
|
||
self.cluster_min_speed = 0.0 # min speed before dropping to 0
|
||
|
||
Q = [[0.0, 0.0], [0.0, 100.0]]
|
||
R = 0.3
|
||
A = [[1.0, DT_CTRL], [0.0, 1.0]]
|
||
C = [[1.0, 0.0]]
|
||
x0=[[0.0], [0.0]]
|
||
K = get_kalman_gain(DT_CTRL, np.array(A), np.array(C), np.array(Q), R)
|
||
self.v_ego_kf = KF1D(x0=x0, A=A, C=C[0], K=K)
|
||
|
||
# FrogPilot variables
|
||
self.lkas_enabled = False
|
||
self.lkas_previously_enabled = False
|
||
|
||
self.prev_distance_button = 0
|
||
self.distance_button = 0
|
||
|
||
def update_speed_kf(self, v_ego_raw):
|
||
if abs(v_ego_raw - self.v_ego_kf.x[0][0]) > 2.0: # Prevent large accelerations when car starts at non zero speed
|
||
self.v_ego_kf.set_x([[v_ego_raw], [0.0]])
|
||
|
||
v_ego_x = self.v_ego_kf.update(v_ego_raw)
|
||
return float(v_ego_x[0]), float(v_ego_x[1])
|
||
|
||
def get_wheel_speeds(self, fl, fr, rl, rr, unit=CV.KPH_TO_MS):
|
||
factor = unit * self.CP.wheelSpeedFactor
|
||
|
||
wheelSpeeds = car.CarState.WheelSpeeds.new_message()
|
||
wheelSpeeds.fl = fl * factor
|
||
wheelSpeeds.fr = fr * factor
|
||
wheelSpeeds.rl = rl * factor
|
||
wheelSpeeds.rr = rr * factor
|
||
return wheelSpeeds
|
||
|
||
def update_blinker_from_lamp(self, blinker_time: int, left_blinker_lamp: bool, right_blinker_lamp: bool):
|
||
"""Update blinkers from lights. Enable output when light was seen within the last `blinker_time`
|
||
iterations"""
|
||
# TODO: Handle case when switching direction. Now both blinkers can be on at the same time
|
||
self.left_blinker_cnt = blinker_time if left_blinker_lamp else max(self.left_blinker_cnt - 1, 0)
|
||
self.right_blinker_cnt = blinker_time if right_blinker_lamp else max(self.right_blinker_cnt - 1, 0)
|
||
return self.left_blinker_cnt > 0, self.right_blinker_cnt > 0
|
||
|
||
def update_steering_pressed(self, steering_pressed, steering_pressed_min_count):
|
||
"""Applies filtering on steering pressed for noisy driver torque signals."""
|
||
self.steering_pressed_cnt += 1 if steering_pressed else -1
|
||
self.steering_pressed_cnt = clip(self.steering_pressed_cnt, 0, steering_pressed_min_count * 2)
|
||
return self.steering_pressed_cnt > steering_pressed_min_count
|
||
|
||
def update_blinker_from_stalk(self, blinker_time: int, left_blinker_stalk: bool, right_blinker_stalk: bool):
|
||
"""Update blinkers from stalk position. When stalk is seen the blinker will be on for at least blinker_time,
|
||
or until the stalk is turned off, whichever is longer. If the opposite stalk direction is seen the blinker
|
||
is forced to the other side. On a rising edge of the stalk the timeout is reset."""
|
||
|
||
if left_blinker_stalk:
|
||
self.right_blinker_cnt = 0
|
||
if not self.left_blinker_prev:
|
||
self.left_blinker_cnt = blinker_time
|
||
|
||
if right_blinker_stalk:
|
||
self.left_blinker_cnt = 0
|
||
if not self.right_blinker_prev:
|
||
self.right_blinker_cnt = blinker_time
|
||
|
||
self.left_blinker_cnt = max(self.left_blinker_cnt - 1, 0)
|
||
self.right_blinker_cnt = max(self.right_blinker_cnt - 1, 0)
|
||
|
||
self.left_blinker_prev = left_blinker_stalk
|
||
self.right_blinker_prev = right_blinker_stalk
|
||
|
||
return bool(left_blinker_stalk or self.left_blinker_cnt > 0), bool(right_blinker_stalk or self.right_blinker_cnt > 0)
|
||
|
||
@staticmethod
|
||
def parse_gear_shifter(gear: str | None) -> car.CarState.GearShifter:
|
||
if gear is None:
|
||
return GearShifter.unknown
|
||
|
||
d: dict[str, car.CarState.GearShifter] = {
|
||
'P': GearShifter.park, 'PARK': GearShifter.park,
|
||
'R': GearShifter.reverse, 'REVERSE': GearShifter.reverse,
|
||
'N': GearShifter.neutral, 'NEUTRAL': GearShifter.neutral,
|
||
'E': GearShifter.eco, 'ECO': GearShifter.eco,
|
||
'T': GearShifter.manumatic, 'MANUAL': GearShifter.manumatic,
|
||
'D': GearShifter.drive, 'DRIVE': GearShifter.drive,
|
||
'S': GearShifter.sport, 'SPORT': GearShifter.sport,
|
||
'L': GearShifter.low, 'LOW': GearShifter.low,
|
||
'B': GearShifter.brake, 'BRAKE': GearShifter.brake,
|
||
}
|
||
return d.get(gear.upper(), GearShifter.unknown)
|
||
|
||
@staticmethod
|
||
def get_cam_can_parser(CP):
|
||
return None
|
||
|
||
@staticmethod
|
||
def get_adas_can_parser(CP):
|
||
return None
|
||
|
||
@staticmethod
|
||
def get_body_can_parser(CP):
|
||
return None
|
||
|
||
@staticmethod
|
||
def get_loopback_can_parser(CP):
|
||
return None
|
||
|
||
|
||
SendCan = tuple[int, int, bytes, int]
|
||
|
||
|
||
class CarControllerBase(ABC):
|
||
@abstractmethod
|
||
def update(self, CC, CS, now_nanos) -> tuple[car.CarControl.Actuators, list[SendCan]]:
|
||
pass
|
||
|
||
|
||
INTERFACE_ATTR_FILE = {
|
||
"FINGERPRINTS": "fingerprints",
|
||
"FW_VERSIONS": "fingerprints",
|
||
}
|
||
|
||
# interface-specific helpers
|
||
|
||
def get_interface_attr(attr: str, combine_brands: bool = False, ignore_none: bool = False) -> dict[str | StrEnum, Any]:
|
||
# read all the folders in selfdrive/car and return a dict where:
|
||
# - keys are all the car models or brand names
|
||
# - values are attr values from all car folders
|
||
result = {}
|
||
for car_folder in sorted([x[0] for x in os.walk(BASEDIR + '/selfdrive/car')]):
|
||
try:
|
||
brand_name = car_folder.split('/')[-1]
|
||
brand_values = __import__(f'openpilot.selfdrive.car.{brand_name}.{INTERFACE_ATTR_FILE.get(attr, "values")}', fromlist=[attr])
|
||
if hasattr(brand_values, attr) or not ignore_none:
|
||
attr_data = getattr(brand_values, attr, None)
|
||
else:
|
||
continue
|
||
|
||
if combine_brands:
|
||
if isinstance(attr_data, dict):
|
||
for f, v in attr_data.items():
|
||
result[f] = v
|
||
else:
|
||
result[brand_name] = attr_data
|
||
except (ImportError, OSError):
|
||
pass
|
||
|
||
return result
|
||
|
||
|
||
class NanoFFModel:
|
||
def __init__(self, weights_loc: str, platform: str):
|
||
self.weights_loc = weights_loc
|
||
self.platform = platform
|
||
self.load_weights(platform)
|
||
|
||
def load_weights(self, platform: str):
|
||
with open(self.weights_loc) as fob:
|
||
self.weights = {k: np.array(v) for k, v in json.load(fob)[platform].items()}
|
||
|
||
def relu(self, x: np.ndarray):
|
||
return np.maximum(0.0, x)
|
||
|
||
def forward(self, x: np.ndarray):
|
||
assert x.ndim == 1
|
||
x = (x - self.weights['input_norm_mat'][:, 0]) / (self.weights['input_norm_mat'][:, 1] - self.weights['input_norm_mat'][:, 0])
|
||
x = self.relu(np.dot(x, self.weights['w_1']) + self.weights['b_1'])
|
||
x = self.relu(np.dot(x, self.weights['w_2']) + self.weights['b_2'])
|
||
x = self.relu(np.dot(x, self.weights['w_3']) + self.weights['b_3'])
|
||
x = np.dot(x, self.weights['w_4']) + self.weights['b_4']
|
||
return x
|
||
|
||
def predict(self, x: list[float], do_sample: bool = False):
|
||
x = self.forward(np.array(x))
|
||
if do_sample:
|
||
pred = np.random.laplace(x[0], np.exp(x[1]) / self.weights['temperature'])
|
||
else:
|
||
pred = x[0]
|
||
pred = pred * (self.weights['output_norm_mat'][1] - self.weights['output_norm_mat'][0]) + self.weights['output_norm_mat'][0]
|
||
return pred
|