carrot/tools/replay/seg_mgr.cc
Vehicle Researcher 4fca6dec8e openpilot v0.9.8 release
date: 2025-01-29T09:09:56
master commit: 227bb68e1891619b360b89809e6822d50d34228f
2025-01-29 09:09:58 +00:00

141 lines
4.5 KiB
C++

#include "tools/replay/seg_mgr.h"
#include <algorithm>
SegmentManager::~SegmentManager() {
{
std::unique_lock lock(mutex_);
exit_ = true;
}
cv_.notify_one();
if (thread_.joinable()) thread_.join();
}
bool SegmentManager::load() {
if (!route_.load()) {
rError("failed to load route: %s", route_.name().c_str());
return false;
}
for (const auto &[n, file] : route_.segments()) {
if (!file.rlog.empty() || !file.qlog.empty()) {
segments_.insert({n, nullptr});
}
}
if (segments_.empty()) {
rInfo("no valid segments in route: %s", route_.name().c_str());
return false;
}
rInfo("loaded route %s with %zu valid segments", route_.name().c_str(), segments_.size());
thread_ = std::thread(&SegmentManager::manageSegmentCache, this);
return true;
}
void SegmentManager::setCurrentSegment(int seg_num) {
{
std::unique_lock lock(mutex_);
if (cur_seg_num_ == seg_num) return;
cur_seg_num_ = seg_num;
needs_update_ = true;
}
cv_.notify_one();
}
void SegmentManager::manageSegmentCache() {
while (true) {
std::unique_lock lock(mutex_);
cv_.wait(lock, [this]() { return exit_ || needs_update_; });
if (exit_) break;
needs_update_ = false;
auto cur = segments_.lower_bound(cur_seg_num_);
if (cur == segments_.end()) continue;
// Calculate the range of segments to load
auto begin = std::prev(cur, std::min<int>(segment_cache_limit_ / 2, std::distance(segments_.begin(), cur)));
auto end = std::next(begin, std::min<int>(segment_cache_limit_, std::distance(begin, segments_.end())));
begin = std::prev(end, std::min<int>(segment_cache_limit_, std::distance(segments_.begin(), end)));
lock.unlock();
loadSegmentsInRange(begin, cur, end);
bool merged = mergeSegments(begin, end);
// Free segments outside the current range
std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); });
std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); });
if (merged && onSegmentMergedCallback_) {
onSegmentMergedCallback_(); // Notify listener that segments have been merged
}
}
}
bool SegmentManager::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
std::set<int> segments_to_merge;
size_t total_event_count = 0;
for (auto it = begin; it != end; ++it) {
const auto &segment = it->second;
if (segment && segment->getState() == Segment::LoadState::Loaded) {
segments_to_merge.insert(segment->seg_num);
total_event_count += segment->log->events.size();
}
}
if (segments_to_merge == merged_segments_) return false;
auto merged_event_data = std::make_shared<EventData>();
auto &merged_events = merged_event_data->events;
merged_events.reserve(total_event_count);
rDebug("merging segments: %s", join(segments_to_merge, ", ").c_str());
for (int n : segments_to_merge) {
const auto &events = segments_.at(n)->log->events;
if (events.empty()) continue;
// Skip INIT_DATA if present
auto events_begin = (events.front().which == cereal::Event::Which::INIT_DATA) ? std::next(events.begin()) : events.begin();
size_t previous_size = merged_events.size();
merged_events.insert(merged_events.end(), events_begin, events.end());
std::inplace_merge(merged_events.begin(), merged_events.begin() + previous_size, merged_events.end());
merged_event_data->segments[n] = segments_.at(n);
}
std::atomic_store(&event_data_, std::move(merged_event_data));
merged_segments_ = segments_to_merge;
return true;
}
void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) {
auto tryLoadSegment = [this](auto first, auto last) {
for (auto it = first; it != last; ++it) {
auto &segment_ptr = it->second;
if (!segment_ptr) {
segment_ptr = std::make_shared<Segment>(
it->first, route_.at(it->first), flags_, filters_,
[this](int seg_num, bool success) {
std::unique_lock lock(mutex_);
needs_update_ = true;
cv_.notify_one();
});
}
if (segment_ptr->getState() == Segment::LoadState::Loading) {
return true; // Segment is still loading
}
}
return false; // No segments need loading
};
// Try forward loading, then reverse if necessary
if (!tryLoadSegment(cur, end)) {
tryLoadSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin));
}
}