35#if __BYTE_ORDER == __LITTLE_ENDIAN
36static const uint64_t kFirstFiveBytes = 0x000000ffffffffff;
38static const uint64_t kFirstFiveBytes = 0xffffffffff000000;
41#define PUBX 190459303248
42#define STALK 323401897043
52static inline uint64_t GetNmeaType(
const std::string& line) {
54 if (line[0] == 0x5c) {
56 skipchars = line.find(
',', 1);
57 if (skipchars == std::string::npos) {
62 uint64_t result = *
reinterpret_cast<const uint64_t*
>(&line[skipchars]);
63 uint64_t result5 = result & kFirstFiveBytes;
64 if (result5 == PUBX || result5 == STALK) {
75static void ReportOverrun(
const std::string& msg,
bool overrun_reported) {
76 auto& registry = CommDriverRegistry::GetInstance();
82 DEBUG_LOG <<
"CommOutQueue: Overrun on: " << msg;
83 if (!overrun_reported) registry.evt_comm_overrun.Notify(msg);
86CommOutQueue::BufferItem::BufferItem(
const std::string& _line)
87 : type(GetNmeaType(_line)),
89 stamp(std::chrono::steady_clock::now()) {}
91CommOutQueue::BufferItem::BufferItem(
const BufferItem& other)
94 stamp(std::chrono::steady_clock::now()) {}
96using duration_ms = std::chrono::duration<unsigned, std::milli>;
99 : m_size(max_buffered - 1),
100 m_min_msg_gap(min_msg_gap),
101 m_overrun_reported(false) {
102 assert(max_buffered >= 1 &&
"Illegal buffer size");
106 if (line.size() < 7)
return false;
108 auto match = [item](
const BufferItem& it) {
return it.type == item.type; };
110 std::lock_guard<std::mutex> lock(m_mutex);
111 int found = std::count_if(m_buffer.begin(), m_buffer.end(), match);
113 auto it = std::find_if(m_buffer.begin(), m_buffer.end(), match);
114 assert(it != m_buffer.end());
115 auto timespan = item.stamp - it->stamp;
116 if (timespan < m_min_msg_gap) {
118 if (m_rate_limits_logged.find(item.type) != m_rate_limits_logged.end()) {
119 m_rate_limits_logged.insert(item.type);
120 wxLogMessage(
"Limiting output rate for %u, message: %s", item.type,
125 if (found > m_size) {
128 if (!m_overrun_reported) {
129 ReportOverrun(line, m_overrun_reported);
130 m_overrun_reported =
true;
134 return it.type == item.type && matches++ >= m_size;
136 m_buffer.erase(std::remove_if(m_buffer.begin(), m_buffer.end(), match_cnt),
139 m_buffer.insert(m_buffer.begin(), item);
144 std::lock_guard<std::mutex> lock(m_mutex);
146 if (m_buffer.size() <= 0)
147 throw std::underflow_error(
"Attempt to pop() from empty buffer");
148 auto item = m_buffer.back();
154 std::lock_guard<std::mutex> lock(m_mutex);
155 return m_buffer.size();
159 if (line.size() < 7)
return false;
161 auto match = [&item](
const BufferItem& it) {
return it.type == item.type; };
163 std::lock_guard<std::mutex> lock(m_mutex);
164 auto found = std::find_if(m_buffer.begin(), m_buffer.end(), match);
165 if (found != m_buffer.end()) {
167 m_buffer.erase(std::remove_if(found, m_buffer.end(), match),
170 m_buffer.push_back(item);
175 using std::chrono::duration;
176 using std::chrono::steady_clock;
178 auto t1 = steady_clock::now();
180 msg_perf[GetNmeaType(line)].in(line.size(), ok);
181 perf.in(line.size(), ok);
182 auto t2 = steady_clock::now();
183 duration<double, std::micro> us_time = t2 - t1;
185 push_time = 0.95 * push_time + 0.05 * us_time.count();
190 using std::chrono::duration;
191 using std::chrono::steady_clock;
193 auto t1 = steady_clock::now();
196 std::lock_guard<std::mutex> lock(m_mutex);
197 if (m_buffer.size() <= 0)
198 throw std::underflow_error(
"Attempt to pop() from empty buffer");
199 auto item = m_buffer.back();
201 perf.out(item.line.size(), item.stamp);
202 msg_perf[item.type].out(item.line.size(), item.stamp);
203 auto t2 = steady_clock::now();
204 duration<double, std::micro> us_time = t2 - t1;
207 pop_time = 0.95 * pop_time + 0.05 * us_time.count();
213 os <<
"push_time: " << q.push_time <<
", ";
214 os <<
"pop_time: " << q.pop_time <<
", ";
215 os <<
"perf: " << q.perf <<
", ";
217 for (
const auto& kv : q.msg_perf) {
218 os << kv.first <<
": " << kv.second <<
", ";
225std::ostream& operator<<(std::ostream& os,
const PerfCounter& pc) {
227 os <<
"msgs_in: " << pc.msgs_in <<
", ";
228 os <<
"msgs_out: " << pc.msgs_out <<
", ";
229 os <<
"bytes_in: " << pc.bytes_in <<
", ";
230 os <<
"bytes_out: " << pc.bytes_out <<
", ";
231 os <<
"bps_in: " << pc.bps_in <<
", ";
232 os <<
"mps_in: " << pc.mps_in <<
", ";
233 os <<
"bps_out: " << pc.bps_out <<
", ";
234 os <<
"mps_out: " << pc.mps_out <<
", ";
235 os <<
"in_out_delay_us: " << pc.in_out_delay_us <<
", ";
236 os <<
"overflow_msgs: " << pc.overflow_msgs <<
", ";
237 os <<
"in_queue: " << pc.in_queue;
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
Driver registration container, a singleton.
Communications output queue.
Enhanced logging interface on top of wx/log.h.