OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_out_queue.cpp
Go to the documentation of this file.
1/***************************************************************************
2 * Copyright (C) 2022 - 2024 Alec Leamas *
3 * *
4 * This program is free software; you can redistribute it and/or modify *
5 * it under the terms of the GNU General Public License as published by *
6 * the Free Software Foundation; either version 2 of the License, or *
7 * (at your option) any later version. *
8 * *
9 * This program is distributed in the hope that it will be useful, *
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
12 * GNU General Public License for more details. *
13 * *
14 * You should have received a copy of the GNU General Public License *
15 * along with this program; if not, see <https://www.gnu.org/licenses/>. *
16 **************************************************************************/
17
24#include <algorithm>
25#include <cassert>
26#include <stdexcept>
27
28#include <sys/types.h>
29
32#include "model/logger.h"
33
34// Both arm and intel are little endian, but better safe than sorry:
35#if __BYTE_ORDER == __LITTLE_ENDIAN
36static const uint64_t kFirstFiveBytes = 0x000000ffffffffff;
37#else
38static const uint64_t kFirstFiveBytes = 0xffffffffff000000;
39#endif
40
41#define PUBX 190459303248 // "PUBX,"
42#define STALK 323401897043 // "STALK"
43
52static inline uint64_t GetNmeaType(const std::string& line) {
53 size_t skipchars = 1;
54 if (line[0] == 0x5c) { // Starts with the tag block '\', we need to skip it
55 // and then also the start delimiter
56 skipchars = line.find(',', 1);
57 if (skipchars == std::string::npos) {
58 skipchars = 1; // This should never happen, there is no end of the tag
59 // block, but just in case...
60 }
61 }
62 uint64_t result = *reinterpret_cast<const uint64_t*>(&line[skipchars]);
63 uint64_t result5 = result & kFirstFiveBytes;
64 if (result5 == PUBX || result5 == STALK) {
65 /* PUBX from possibly high-speed u-blox GNSS receivers that are sure to
66 overload slow connections has a 2 digit zero-padded numerical message ID
67 in the first field Similar with STALK, the two digit Seatalk message ID
68 is in the first field Both fit nicely into 8 bytes though... */
69 return result;
70 } else {
71 return result5;
72 }
73}
74
75static void ReportOverrun(const std::string& msg, bool overrun_reported) {
76 auto& registry = CommDriverRegistry::GetInstance();
77 std::string s;
78 if (msg.length() < 6)
79 s = msg;
80 else
81 s = msg.substr(0, 5);
82 DEBUG_LOG << "CommOutQueue: Overrun on: " << msg;
83 if (!overrun_reported) registry.evt_comm_overrun.Notify(msg);
84}
85
86CommOutQueue::BufferItem::BufferItem(const std::string& _line)
87 : type(GetNmeaType(_line)),
88 line(_line),
89 stamp(std::chrono::steady_clock::now()) {}
90
91CommOutQueue::BufferItem::BufferItem(const BufferItem& other)
92 : type(other.type),
93 line(other.line),
94 stamp(std::chrono::steady_clock::now()) {}
95
96using duration_ms = std::chrono::duration<unsigned, std::milli>;
97
98CommOutQueue::CommOutQueue(unsigned max_buffered, duration_ms min_msg_gap)
99 : m_size(max_buffered - 1),
100 m_min_msg_gap(min_msg_gap),
101 m_overrun_reported(false) {
102 assert(max_buffered >= 1 && "Illegal buffer size");
103}
104
105bool CommOutQueue::push_back(const std::string& line) {
106 if (line.size() < 7) return false;
107 BufferItem item(line);
108 auto match = [item](const BufferItem& it) { return it.type == item.type; };
109
110 std::lock_guard<std::mutex> lock(m_mutex);
111 int found = std::count_if(m_buffer.begin(), m_buffer.end(), match);
112 if (found > 0) {
113 auto it = std::find_if(m_buffer.begin(), m_buffer.end(), match);
114 assert(it != m_buffer.end());
115 auto timespan = item.stamp - it->stamp;
116 if (timespan < m_min_msg_gap) {
117 m_buffer.erase(it);
118 if (m_rate_limits_logged.find(item.type) != m_rate_limits_logged.end()) {
119 m_rate_limits_logged.insert(item.type);
120 wxLogMessage("Limiting output rate for %u, message: %s", item.type,
121 line.c_str());
122 }
123 }
124 }
125 if (found > m_size) {
126 // overflow: too many of these kind of messages
127 // are still not processed. Drop so we keep m_size of them.
128 if (!m_overrun_reported) {
129 ReportOverrun(line, m_overrun_reported);
130 m_overrun_reported = true;
131 }
132 int matches = 0;
133 auto match_cnt = [&](const BufferItem& it) {
134 return it.type == item.type && matches++ >= m_size;
135 };
136 m_buffer.erase(std::remove_if(m_buffer.begin(), m_buffer.end(), match_cnt),
137 m_buffer.end());
138 }
139 m_buffer.insert(m_buffer.begin(), item);
140 return true;
141}
142
143std::string CommOutQueue::pop() {
144 std::lock_guard<std::mutex> lock(m_mutex);
145
146 if (m_buffer.size() <= 0)
147 throw std::underflow_error("Attempt to pop() from empty buffer");
148 auto item = m_buffer.back();
149 m_buffer.pop_back();
150 return item.line;
151}
152
154 std::lock_guard<std::mutex> lock(m_mutex);
155 return m_buffer.size();
156}
157
158bool CommOutQueueSingle::push_back(const std::string& line) {
159 if (line.size() < 7) return false;
160 BufferItem item(line);
161 auto match = [&item](const BufferItem& it) { return it.type == item.type; };
162
163 std::lock_guard<std::mutex> lock(m_mutex);
164 auto found = std::find_if(m_buffer.begin(), m_buffer.end(), match);
165 if (found != m_buffer.end()) {
166 // overflow: this kind of message is still not processed. Drop it
167 m_buffer.erase(std::remove_if(found, m_buffer.end(), match),
168 m_buffer.end());
169 }
170 m_buffer.push_back(item);
171 return true;
172}
173
174bool MeasuredCommOutQueue::push_back(const std::string& line) {
175 using std::chrono::duration;
176 using std::chrono::steady_clock;
177
178 auto t1 = steady_clock::now();
179 bool ok = CommOutQueue::push_back(line);
180 msg_perf[GetNmeaType(line)].in(line.size(), ok);
181 perf.in(line.size(), ok);
182 auto t2 = steady_clock::now();
183 duration<double, std::micro> us_time = t2 - t1;
184
185 push_time = 0.95 * push_time + 0.05 * us_time.count(); // LP filter.
186 return ok;
187}
188
190 using std::chrono::duration;
191 using std::chrono::steady_clock;
192
193 auto t1 = steady_clock::now();
194 // auto msg = CommOutQueue::pop(); // We need to update the perf counters,
195 // can't just pop() here
196 std::lock_guard<std::mutex> lock(m_mutex);
197 if (m_buffer.size() <= 0)
198 throw std::underflow_error("Attempt to pop() from empty buffer");
199 auto item = m_buffer.back();
200 m_buffer.pop_back();
201 perf.out(item.line.size(), item.stamp);
202 msg_perf[item.type].out(item.line.size(), item.stamp);
203 auto t2 = steady_clock::now();
204 duration<double, std::micro> us_time = t2 - t1;
205 us_time = t2 - t1;
206
207 pop_time = 0.95 * pop_time + 0.05 * us_time.count(); // LP filter.
208 return item.line;
209}
210
211std::ostream& operator<<(std::ostream& os, const MeasuredCommOutQueue& q) {
212 os << "{";
213 os << "push_time: " << q.push_time << ", ";
214 os << "pop_time: " << q.pop_time << ", ";
215 os << "perf: " << q.perf << ", ";
216 os << "msg_perf: [";
217 for (const auto& kv : q.msg_perf) {
218 os << kv.first << ": " << kv.second << ", ";
219 }
220 os << "]";
221 os << "}";
222 return os;
223};
224
225std::ostream& operator<<(std::ostream& os, const PerfCounter& pc) {
226 os << "{";
227 os << "msgs_in: " << pc.msgs_in << ", ";
228 os << "msgs_out: " << pc.msgs_out << ", ";
229 os << "bytes_in: " << pc.bytes_in << ", ";
230 os << "bytes_out: " << pc.bytes_out << ", ";
231 os << "bps_in: " << pc.bps_in << ", ";
232 os << "mps_in: " << pc.mps_in << ", ";
233 os << "bps_out: " << pc.bps_out << ", ";
234 os << "mps_out: " << pc.mps_out << ", ";
235 os << "in_out_delay_us: " << pc.in_out_delay_us << ", ";
236 os << "overflow_msgs: " << pc.overflow_msgs << ", ";
237 os << "in_queue: " << pc.in_queue;
238 os << "}";
239 return os;
240};
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
Driver registration container, a singleton.
Communications output queue.
Enhanced logging interface on top of wx/log.h.