107 lines
4.2 KiB
Python
Raw Normal View History

import json
import os
import shutil
import subprocess
from typing import Literal
from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile
class TiciLPA(LPABase):
def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'):
self.env = os.environ.copy()
self.env['LPAC_APDU'] = interface
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0'
self.env['AT_DEVICE'] = '/dev/ttyUSB2'
self.timeout_sec = 45
if shutil.which('lpac') is None:
raise LPAError('lpac not found, must be installed!')
def list_profiles(self) -> list[Profile]:
msgs = self._invoke('profile', 'list')
self._validate_successful(msgs)
return [Profile(
iccid=p['iccid'],
nickname=p['profileNickname'],
enabled=p['profileState'] == 'enabled',
provider=p['serviceProviderName']
) for p in msgs[-1]['payload']['data']]
def get_active_profile(self) -> Profile | None:
return next((p for p in self.list_profiles() if p.enabled), None)
def delete_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest is not None and latest.iccid == iccid:
raise LPAError('cannot delete active profile, switch to another profile first')
self._validate_successful(self._invoke('profile', 'delete', iccid))
self._process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None:
msgs = self._invoke('profile', 'download', '-a', qr)
self._validate_successful(msgs)
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
if new_profile is None:
raise LPAError('no new profile found')
if nickname:
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
self._process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
self._validate_profile_exists(iccid)
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
def switch_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest and latest.iccid == iccid:
return
self._validate_successful(self._invoke('profile', 'enable', iccid))
self._process_notifications()
def _invoke(self, *cmd: str):
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
try:
out, err = proc.communicate(timeout=self.timeout_sec)
except subprocess.TimeoutExpired as e:
proc.kill()
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
messages = []
for line in out.decode().strip().splitlines():
if line.startswith('{'):
message = json.loads(line)
# lpac response format validations
assert 'type' in message, 'expected type in message'
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
assert 'payload' in message, 'expected payload in message'
assert 'code' in message['payload'], 'expected code in message payload'
assert 'data' in message['payload'], 'expected data in message payload'
msg_ret_code = message['payload']['code']
if msg_ret_code != 0:
raise LPAError(f"lpac {' '.join(cmd)} failed with code {msg_ret_code}: <{message['payload']['message']}> {message['payload']['data']}")
messages.append(message)
if len(messages) == 0:
raise LPAError(f"lpac {cmd} returned no messages")
return messages
def _process_notifications(self) -> None:
"""
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
"""
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
def _validate_profile_exists(self, iccid: str) -> None:
if not any(p.iccid == iccid for p in self.list_profiles()):
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def _validate_successful(self, msgs: list[dict]) -> None:
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'