carrot/opendbc_repo/opendbc/car/car_helpers.py

210 lines
8.0 KiB
Python
Raw Normal View History

import os
import time
from opendbc.car import gen_empty_fingerprint
from opendbc.car.can_definitions import CanRecvCallable, CanSendCallable
from opendbc.car.carlog import carlog
from opendbc.car.structs import CarParams, CarParamsT
from opendbc.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars
from opendbc.car.fw_versions import ObdCallback, get_fw_versions_ordered, get_present_ecus, match_fw_to_car
from opendbc.car.mock.values import CAR as MOCK
from opendbc.car.values import BRANDS
from opendbc.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN
2024-09-03 16:09:00 +09:00
from common.params import Params
FRAME_FINGERPRINT = 100 # 1s
def load_interfaces(brand_names):
ret = {}
for brand_name in brand_names:
path = f'opendbc.car.{brand_name}'
CarInterface = __import__(path + '.interface', fromlist=['CarInterface']).CarInterface
for model_name in brand_names[brand_name]:
2025-04-18 20:38:55 +09:00
ret[model_name] = CarInterface
return ret
def _get_interface_names() -> dict[str, list[str]]:
# returns a dict of brand name and its respective models
brand_names = {}
for brand in BRANDS:
brand_name = brand.__module__.split('.')[-2]
brand_names[brand_name] = [model.value for model in brand]
return brand_names
# imports from directory opendbc/car/<name>/
interface_names = _get_interface_names()
interfaces = load_interfaces(interface_names)
def can_fingerprint(can_recv: CanRecvCallable) -> tuple[str | None, dict[int, dict]]:
finger = gen_empty_fingerprint()
candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1
frame = 0
car_fingerprint = None
done = False
while not done:
# can_recv(wait_for_one=True) may return zero or multiple packets, so we increment frame for each one we receive
can_packets = can_recv(wait_for_one=True)
for can_packet in can_packets:
for can in can_packet:
# The fingerprint dict is generated for all buses, this way the car interface
# can use it to detect a (valid) multipanda setup and initialize accordingly
if can.src < 128:
if can.src not in finger:
finger[can.src] = {}
finger[can.src][can.address] = len(can.dat)
for b in candidate_cars:
# Ignore extended messages and VIN query response.
if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8):
candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b])
# if we only have one car choice and the time since we got our first
# message has elapsed, exit
for b in candidate_cars:
if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT:
# fingerprint done
car_fingerprint = candidate_cars[b][0]
# bail if no cars left or we've been waiting for more than 2s
failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200
succeeded = car_fingerprint is not None
done = failed or succeeded
frame += 1
return car_fingerprint, finger
# **** for use live only ****
def fingerprint(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, num_pandas: int,
cached_params: CarParamsT | None) -> tuple[str | None, dict, str, list[CarParams.CarFw], CarParams.FingerprintSource, bool]:
fixed_fingerprint = os.environ.get('FINGERPRINT', "")
skip_fw_query = os.environ.get('SKIP_FW_QUERY', False)
disable_fw_cache = os.environ.get('DISABLE_FW_CACHE', False)
ecu_rx_addrs = set()
start_time = time.monotonic()
if not skip_fw_query:
if cached_params is not None and cached_params.brand != "mock" and len(cached_params.carFw) > 0 and \
cached_params.carVin is not VIN_UNKNOWN and not disable_fw_cache:
carlog.warning("Using cached CarParams")
vin_rx_addr, vin_rx_bus, vin = -1, -1, cached_params.carVin
car_fw = list(cached_params.carFw)
cached = True
else:
carlog.warning("Getting VIN & FW versions")
# enable OBD multiplexing for VIN query
# NOTE: this takes ~0.1s and is relied on to allow sendcan subscriber to connect in time
set_obd_multiplexing(True)
# VIN query only reliably works through OBDII
vin_rx_addr, vin_rx_bus, vin = get_vin(can_recv, can_send, (0, 1))
ecu_rx_addrs = get_present_ecus(can_recv, can_send, set_obd_multiplexing, num_pandas=num_pandas)
car_fw = get_fw_versions_ordered(can_recv, can_send, set_obd_multiplexing, vin, ecu_rx_addrs, num_pandas=num_pandas)
cached = False
exact_fw_match, fw_candidates = match_fw_to_car(car_fw, vin)
else:
vin_rx_addr, vin_rx_bus, vin = -1, -1, VIN_UNKNOWN
exact_fw_match, fw_candidates, car_fw = True, set(), []
cached = False
if not is_valid_vin(vin):
carlog.error({"event": "Malformed VIN", "vin": vin})
vin = VIN_UNKNOWN
carlog.warning("VIN %s", vin)
# disable OBD multiplexing for CAN fingerprinting and potential ECU knockouts
set_obd_multiplexing(False)
fw_query_time = time.monotonic() - start_time
# CAN fingerprint
# drain CAN socket so we get the latest messages
can_recv()
car_fingerprint, finger = can_fingerprint(can_recv)
exact_match = True
source = CarParams.FingerprintSource.can
# If FW query returns exactly 1 candidate, use it
if len(fw_candidates) == 1:
car_fingerprint = list(fw_candidates)[0]
source = CarParams.FingerprintSource.fw
exact_match = exact_fw_match
if fixed_fingerprint:
car_fingerprint = fixed_fingerprint
source = CarParams.FingerprintSource.fixed
carlog.error({"event": "fingerprinted", "car_fingerprint": str(car_fingerprint), "source": source, "fuzzy": not exact_match,
"cached": cached, "fw_count": len(car_fw), "ecu_responses": list(ecu_rx_addrs), "vin_rx_addr": vin_rx_addr,
"vin_rx_bus": vin_rx_bus, "fingerprints": repr(finger), "fw_query_time": fw_query_time})
return car_fingerprint, finger, vin, car_fw, source, exact_match
2025-04-18 20:38:55 +09:00
def get_car(can_recv: CanRecvCallable, can_send: CanSendCallable, set_obd_multiplexing: ObdCallback, alpha_long_allowed: bool,
num_pandas: int = 1, cached_params: CarParamsT | None = None):
candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(can_recv, can_send, set_obd_multiplexing, num_pandas, cached_params)
if candidate is None:
carlog.error({"event": "car doesn't match any fingerprints", "fingerprints": repr(fingerprints)})
candidate = "MOCK"
2024-09-03 16:09:00 +09:00
selected_car = Params().get("CarSelected3")
if selected_car:
def find_car(name: str):
from opendbc.car.hyundai.values import CAR as HYUNDAI
from opendbc.car.gm.values import CAR as GM
from opendbc.car.toyota.values import CAR as TOYOTA
from opendbc.car.mazda.values import CAR as MAZDA
for platform in GM:
for doc in platform.config.car_docs:
if name == doc.name:
return platform
for platform in TOYOTA:
for doc in platform.config.car_docs:
if name == doc.name:
return platform
for platform in HYUNDAI:
for doc in platform.config.car_docs:
if name == doc.name:
return platform
for platform in MAZDA:
for doc in platform.config.car_docs:
if name == doc.name:
return platform
return None
found_car = find_car(selected_car.decode("utf-8"))
if found_car is not None:
candidate = found_car
print(f"SelectedCar = {candidate}")
Params().put("CarName", candidate)
2025-04-18 20:38:55 +09:00
CarInterface = interfaces[candidate]
CP: CarParams = CarInterface.get_params(candidate, fingerprints, car_fw, alpha_long_allowed, docs=False)
CP.carVin = vin
CP.carFw = car_fw
CP.fingerprintSource = source
CP.fuzzyFingerprint = not exact_match
2024-09-03 16:09:00 +09:00
print("Carrot GitBranch = {}, {}".format(Params().get("GitBranch"), Params().get("GitCommitDate")))
2025-04-18 20:38:55 +09:00
return interfaces[CP.carFingerprint](CP)
def get_demo_car_params():
platform = MOCK.MOCK
2025-04-18 20:38:55 +09:00
CarInterface = interfaces[platform]
CP = CarInterface.get_non_essential_params(platform)
return CP