carrot/opendbc_repo/opendbc/car/panda_runner.py
Vehicle Researcher 4fca6dec8e openpilot v0.9.8 release
date: 2025-01-29T09:09:56
master commit: 227bb68e1891619b360b89809e6822d50d34228f
2025-01-29 09:09:58 +00:00

58 lines
2.0 KiB
Python

import time
from contextlib import AbstractContextManager
from panda import Panda
from opendbc.car.car_helpers import get_car
from opendbc.car.can_definitions import CanData
from opendbc.car.structs import CarParams, CarControl
class PandaRunner(AbstractContextManager):
def __enter__(self):
self.p = Panda()
self.p.reset()
# setup + fingerprinting
self.p.set_safety_mode(Panda.SAFETY_ELM327, 1)
self.CI = get_car(self._can_recv, self.p.can_send_many, self.p.set_obd, True)
assert self.CI.CP.carFingerprint.lower() != "mock", "Unable to identify car. Check connections and ensure car is supported."
safety_model = CarParams.SafetyModel.schema.enumerants[self.CI.CP.safetyConfigs[0].safetyModel]
self.p.set_safety_mode(Panda.SAFETY_ELM327, 1)
self.CI.init(self.CI.CP, self._can_recv, self.p.can_send_many)
self.p.set_safety_mode(safety_model, self.CI.CP.safetyConfigs[0].safetyParam)
return self
def __exit__(self, exc_type, exc_value, traceback):
self.p.set_safety_mode(Panda.SAFETY_NOOUTPUT)
self.p.reset() # avoid siren
return super().__exit__(exc_type, exc_value, traceback)
@property
def panda(self) -> Panda:
return self.p
def _can_recv(self, wait_for_one: bool = False) -> list[list[CanData]]:
recv = self.p.can_recv()
while len(recv) == 0 and wait_for_one:
recv = self.p.can_recv()
return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ]
def read(self, strict: bool = True):
cs = self.CI.update([int(time.monotonic()*1e9), self._can_recv()[0]])
if strict:
assert cs.canValid, "CAN went invalid, check connections"
return cs
def write(self, cc: CarControl) -> None:
if cc.enabled and not self.p.health()['controls_allowed']:
# prevent the car from faulting. print a warning?
cc = CarControl(enabled=False)
_, can_sends = self.CI.apply(cc)
self.p.can_send_many(can_sends, timeout=25)
self.p.send_heartbeat()
if __name__ == "__main__":
with PandaRunner() as p:
print(p.read())