
Added function to automatically detect the vehicle's fingerprint if it exists, or an option to force the vehicle's fingerprint if it doesn't.
252 lines
9.2 KiB
Python
252 lines
9.2 KiB
Python
import os
|
|
import threading
|
|
import time
|
|
from collections.abc import Callable
|
|
|
|
from cereal import car
|
|
from openpilot.common.params import Params
|
|
from openpilot.common.basedir import BASEDIR
|
|
from openpilot.system.version import get_short_branch, is_comma_remote, is_tested_branch
|
|
from openpilot.selfdrive.car.interfaces import get_interface_attr
|
|
from openpilot.selfdrive.car.fingerprints import eliminate_incompatible_cars, all_legacy_fingerprint_cars
|
|
from openpilot.selfdrive.car.vin import get_vin, is_valid_vin, VIN_UNKNOWN
|
|
from openpilot.selfdrive.car.fw_versions import get_fw_versions_ordered, get_present_ecus, match_fw_to_car, set_obd_multiplexing
|
|
from openpilot.selfdrive.car.mock.values import CAR as MOCK
|
|
from openpilot.common.swaglog import cloudlog
|
|
import cereal.messaging as messaging
|
|
import openpilot.selfdrive.sentry as sentry
|
|
from openpilot.selfdrive.car import gen_empty_fingerprint
|
|
|
|
FRAME_FINGERPRINT = 100 # 1s
|
|
|
|
EventName = car.CarEvent.EventName
|
|
|
|
|
|
def get_startup_event(car_recognized, controller_available, fw_seen):
|
|
if is_comma_remote() and is_tested_branch():
|
|
event = EventName.startup
|
|
else:
|
|
event = EventName.startupMaster
|
|
|
|
if not car_recognized:
|
|
if fw_seen:
|
|
event = EventName.startupNoCar
|
|
else:
|
|
event = EventName.startupNoFw
|
|
elif car_recognized and not controller_available:
|
|
event = EventName.startupNoControl
|
|
return event
|
|
|
|
|
|
def get_one_can(logcan):
|
|
while True:
|
|
can = messaging.recv_one_retry(logcan)
|
|
if len(can.can) > 0:
|
|
return can
|
|
|
|
|
|
def load_interfaces(brand_names):
|
|
ret = {}
|
|
for brand_name in brand_names:
|
|
path = f'openpilot.selfdrive.car.{brand_name}'
|
|
CarInterface = __import__(path + '.interface', fromlist=['CarInterface']).CarInterface
|
|
|
|
if os.path.exists(BASEDIR + '/' + path.replace('.', '/') + '/carstate.py'):
|
|
CarState = __import__(path + '.carstate', fromlist=['CarState']).CarState
|
|
else:
|
|
CarState = None
|
|
|
|
if os.path.exists(BASEDIR + '/' + path.replace('.', '/') + '/carcontroller.py'):
|
|
CarController = __import__(path + '.carcontroller', fromlist=['CarController']).CarController
|
|
else:
|
|
CarController = None
|
|
|
|
for model_name in brand_names[brand_name]:
|
|
ret[model_name] = (CarInterface, CarController, CarState)
|
|
return ret
|
|
|
|
|
|
def _get_interface_names() -> dict[str, list[str]]:
|
|
# returns a dict of brand name and its respective models
|
|
brand_names = {}
|
|
for brand_name, brand_models in get_interface_attr("CAR").items():
|
|
brand_names[brand_name] = [model.value for model in brand_models]
|
|
|
|
return brand_names
|
|
|
|
|
|
# imports from directory selfdrive/car/<name>/
|
|
interface_names = _get_interface_names()
|
|
interfaces = load_interfaces(interface_names)
|
|
|
|
|
|
def can_fingerprint(next_can: Callable) -> tuple[str | None, dict[int, dict]]:
|
|
finger = gen_empty_fingerprint()
|
|
candidate_cars = {i: all_legacy_fingerprint_cars() for i in [0, 1]} # attempt fingerprint on both bus 0 and 1
|
|
frame = 0
|
|
car_fingerprint = None
|
|
done = False
|
|
|
|
while not done:
|
|
a = next_can()
|
|
|
|
for can in a.can:
|
|
# The fingerprint dict is generated for all buses, this way the car interface
|
|
# can use it to detect a (valid) multipanda setup and initialize accordingly
|
|
if can.src < 128:
|
|
if can.src not in finger:
|
|
finger[can.src] = {}
|
|
finger[can.src][can.address] = len(can.dat)
|
|
|
|
for b in candidate_cars:
|
|
# Ignore extended messages and VIN query response.
|
|
if can.src == b and can.address < 0x800 and can.address not in (0x7df, 0x7e0, 0x7e8):
|
|
candidate_cars[b] = eliminate_incompatible_cars(can, candidate_cars[b])
|
|
|
|
# if we only have one car choice and the time since we got our first
|
|
# message has elapsed, exit
|
|
for b in candidate_cars:
|
|
if len(candidate_cars[b]) == 1 and frame > FRAME_FINGERPRINT:
|
|
# fingerprint done
|
|
car_fingerprint = candidate_cars[b][0]
|
|
|
|
# bail if no cars left or we've been waiting for more than 2s
|
|
failed = (all(len(cc) == 0 for cc in candidate_cars.values()) and frame > FRAME_FINGERPRINT) or frame > 200
|
|
succeeded = car_fingerprint is not None
|
|
done = failed or succeeded
|
|
|
|
frame += 1
|
|
|
|
return car_fingerprint, finger
|
|
|
|
|
|
# **** for use live only ****
|
|
def fingerprint(logcan, sendcan, num_pandas):
|
|
fixed_fingerprint = os.environ.get('FINGERPRINT', "")
|
|
skip_fw_query = os.environ.get('SKIP_FW_QUERY', False)
|
|
disable_fw_cache = os.environ.get('DISABLE_FW_CACHE', False)
|
|
ecu_rx_addrs = set()
|
|
params = Params()
|
|
|
|
start_time = time.monotonic()
|
|
if not skip_fw_query:
|
|
cached_params = params.get("CarParamsCache")
|
|
if cached_params is not None:
|
|
with car.CarParams.from_bytes(cached_params) as cached_params:
|
|
if cached_params.carName == "mock":
|
|
cached_params = None
|
|
|
|
if cached_params is not None and len(cached_params.carFw) > 0 and \
|
|
cached_params.carVin is not VIN_UNKNOWN and not disable_fw_cache:
|
|
cloudlog.warning("Using cached CarParams")
|
|
vin_rx_addr, vin_rx_bus, vin = -1, -1, cached_params.carVin
|
|
car_fw = list(cached_params.carFw)
|
|
cached = True
|
|
else:
|
|
cloudlog.warning("Getting VIN & FW versions")
|
|
# enable OBD multiplexing for VIN query
|
|
# NOTE: this takes ~0.1s and is relied on to allow sendcan subscriber to connect in time
|
|
set_obd_multiplexing(params, True)
|
|
# VIN query only reliably works through OBDII
|
|
vin_rx_addr, vin_rx_bus, vin = get_vin(logcan, sendcan, (0, 1))
|
|
ecu_rx_addrs = get_present_ecus(logcan, sendcan, num_pandas=num_pandas)
|
|
car_fw = get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, num_pandas=num_pandas)
|
|
cached = False
|
|
|
|
exact_fw_match, fw_candidates = match_fw_to_car(car_fw)
|
|
else:
|
|
vin_rx_addr, vin_rx_bus, vin = -1, -1, VIN_UNKNOWN
|
|
exact_fw_match, fw_candidates, car_fw = True, set(), []
|
|
cached = False
|
|
|
|
if not is_valid_vin(vin):
|
|
cloudlog.event("Malformed VIN", vin=vin, error=True)
|
|
vin = VIN_UNKNOWN
|
|
cloudlog.warning("VIN %s", vin)
|
|
params.put("CarVin", vin)
|
|
|
|
# disable OBD multiplexing for CAN fingerprinting and potential ECU knockouts
|
|
set_obd_multiplexing(params, False)
|
|
params.put_bool("FirmwareQueryDone", True)
|
|
|
|
fw_query_time = time.monotonic() - start_time
|
|
|
|
# CAN fingerprint
|
|
# drain CAN socket so we get the latest messages
|
|
messaging.drain_sock_raw(logcan)
|
|
car_fingerprint, finger = can_fingerprint(lambda: get_one_can(logcan))
|
|
|
|
exact_match = True
|
|
source = car.CarParams.FingerprintSource.can
|
|
|
|
# If FW query returns exactly 1 candidate, use it
|
|
if len(fw_candidates) == 1:
|
|
car_fingerprint = list(fw_candidates)[0]
|
|
source = car.CarParams.FingerprintSource.fw
|
|
exact_match = exact_fw_match
|
|
|
|
if fixed_fingerprint:
|
|
car_fingerprint = fixed_fingerprint
|
|
source = car.CarParams.FingerprintSource.fixed
|
|
|
|
cloudlog.event("fingerprinted", car_fingerprint=car_fingerprint, source=source, fuzzy=not exact_match, cached=cached,
|
|
fw_count=len(car_fw), ecu_responses=list(ecu_rx_addrs), vin_rx_addr=vin_rx_addr, vin_rx_bus=vin_rx_bus,
|
|
fingerprints=repr(finger), fw_query_time=fw_query_time, error=True)
|
|
|
|
return car_fingerprint, finger, vin, car_fw, source, exact_match
|
|
|
|
|
|
def get_car_interface(CP):
|
|
CarInterface, CarController, CarState = interfaces[CP.carFingerprint]
|
|
return CarInterface(CP, CarController, CarState)
|
|
|
|
|
|
def get_car(params, logcan, sendcan, disable_openpilot_long, experimental_long_allowed, num_pandas=1):
|
|
car_brand = params.get("CarMake", encoding='utf-8')
|
|
car_model = params.get("CarModel", encoding='utf-8')
|
|
|
|
force_fingerprint = params.get_bool("ForceFingerprint")
|
|
|
|
candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(logcan, sendcan, num_pandas)
|
|
|
|
if candidate is None or force_fingerprint:
|
|
if car_model is not None:
|
|
candidate = car_model
|
|
else:
|
|
cloudlog.event("car doesn't match any fingerprints", fingerprints=repr(fingerprints), error=True)
|
|
candidate = "mock"
|
|
|
|
if car_model is None and candidate != "mock":
|
|
params.put("CarMake", candidate.split(' ')[0].title())
|
|
params.put("CarModel", candidate)
|
|
|
|
if get_short_branch() == "FrogPilot-Development" and not Params("/persist/params").get_bool("FrogsGoMoo"):
|
|
cloudlog.event("Blocked user from using the 'FrogPilot-Development' branch", fingerprints=repr(fingerprints), error=True)
|
|
candidate = "mock"
|
|
fingerprint_log = threading.Thread(target=sentry.capture_fingerprint, args=(candidate, params, True,))
|
|
fingerprint_log.start()
|
|
elif not params.get_bool("FingerprintLogged"):
|
|
fingerprint_log = threading.Thread(target=sentry.capture_fingerprint, args=(candidate, params,))
|
|
fingerprint_log.start()
|
|
|
|
CarInterface, _, _ = interfaces[candidate]
|
|
CP = CarInterface.get_params(params, candidate, fingerprints, car_fw, disable_openpilot_long, experimental_long_allowed, docs=False)
|
|
CP.carVin = vin
|
|
CP.carFw = car_fw
|
|
CP.fingerprintSource = source
|
|
CP.fuzzyFingerprint = not exact_match
|
|
|
|
return get_car_interface(CP), CP
|
|
|
|
def write_car_param(platform=MOCK.MOCK):
|
|
params = Params()
|
|
CarInterface, _, _ = interfaces[platform]
|
|
CP = CarInterface.get_non_essential_params(platform)
|
|
params.put("CarParams", CP.to_bytes())
|
|
|
|
def get_demo_car_params():
|
|
platform = MOCK.MOCK
|
|
CarInterface, _, _ = interfaces[platform]
|
|
CP = CarInterface.get_non_essential_params(platform)
|
|
return CP
|