OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_out_queue.cpp
1#include <algorithm>
2#include <cassert>
3#include <stdexcept>
4
5#include <sys/types.h>
6
8#include "model/comm_out_queue.h"
9#include "model/logger.h"
10
11// Both arm and intel are little endian, but better safe than sorry:
12#if __BYTE_ORDER == __LITTLE_ENDIAN
13static const uint64_t kFirstFiveBytes = 0x000000ffffffffff;
14#else
15static const uint64_t kFirstFiveBytes = 0xffffffffff000000;
16#endif
17
18#define PUBX 190459303248 // "PUBX,"
19#define STALK 323401897043 // "STALK"
20
29static inline uint64_t GetNmeaType(const std::string& line) {
30 size_t skipchars = 1;
31 if (line[0] == 0x5c) { // Starts with the tag block '\', we need to skip it
32 // and then also the start delimiter
33 skipchars = line.find(',', 1);
34 if (skipchars == std::string::npos) {
35 skipchars = 1; // This should never happen, there is no end of the tag
36 // block, but just in case...
37 }
38 }
39 uint64_t result = *reinterpret_cast<const uint64_t*>(&line[skipchars]);
40 uint64_t result5 = result & kFirstFiveBytes;
41 if (result5 == PUBX || result5 == STALK) {
42 /* PUBX from possibly high-speed u-blox GNSS receivers that are sure to
43 overload slow connections has a 2 digit zero-padded numerical message ID
44 in the first field Similar with STALK, the two digit Seatalk message ID
45 is in the first field Both fit nicely into 8 bytes though... */
46 return result;
47 } else {
48 return result5;
49 }
50}
51
52static void ReportOverrun(const std::string& msg, bool overrun_reported) {
53 auto& registry = CommDriverRegistry::GetInstance();
54 std::string s;
55 if (msg.length() < 6)
56 s = msg;
57 else
58 s = msg.substr(0, 5);
59 DEBUG_LOG << "CommOutQueue: Overrun on: " << msg;
60 if (!overrun_reported) registry.evt_comm_overrun.Notify(msg);
61}
62
63CommOutQueue::BufferItem::BufferItem(const std::string& _line)
64 : type(GetNmeaType(_line)),
65 line(_line),
66 stamp(std::chrono::steady_clock::now()) {}
67
68CommOutQueue::BufferItem::BufferItem(const BufferItem& other)
69 : type(other.type),
70 line(other.line),
71 stamp(std::chrono::steady_clock::now()) {}
72
73using duration_ms = std::chrono::duration<unsigned, std::milli>;
74
75CommOutQueue::CommOutQueue(unsigned max_buffered, duration_ms min_msg_gap)
76 : m_size(max_buffered - 1),
77 m_min_msg_gap(min_msg_gap),
78 m_overrun_reported(false) {
79 assert(max_buffered >= 1 && "Illegal buffer size");
80}
81
82bool CommOutQueue::push_back(const std::string& line) {
83 if (line.size() < 7) return false;
84 BufferItem item(line);
85 auto match = [item](const BufferItem& it) { return it.type == item.type; };
86
87 std::lock_guard<std::mutex> lock(m_mutex);
88 int found = std::count_if(m_buffer.begin(), m_buffer.end(), match);
89 if (found > 0) {
90 auto it = std::find_if(m_buffer.begin(), m_buffer.end(), match);
91 assert(it != m_buffer.end());
92 auto timespan = item.stamp - it->stamp;
93 if (timespan < m_min_msg_gap) {
94 m_buffer.erase(it);
95 if (m_rate_limits_logged.find(item.type) != m_rate_limits_logged.end()) {
96 m_rate_limits_logged.insert(item.type);
97 wxLogMessage("Limiting output rate for %u, message: %s", item.type,
98 line.c_str());
99 }
100 }
101 }
102 if (found > m_size) {
103 // overflow: too many of these kind of messages
104 // are still not processed. Drop so we keep m_size of them.
105 if (!m_overrun_reported) {
106 ReportOverrun(line, m_overrun_reported);
107 m_overrun_reported = true;
108 }
109 int matches = 0;
110 auto match_cnt = [&](const BufferItem& it) {
111 return it.type == item.type && matches++ >= m_size;
112 };
113 m_buffer.erase(std::remove_if(m_buffer.begin(), m_buffer.end(), match_cnt),
114 m_buffer.end());
115 }
116 m_buffer.insert(m_buffer.begin(), item);
117 return true;
118}
119
120std::string CommOutQueue::pop() {
121 std::lock_guard<std::mutex> lock(m_mutex);
122
123 if (m_buffer.size() <= 0)
124 throw std::underflow_error("Attempt to pop() from empty buffer");
125 auto item = m_buffer.back();
126 m_buffer.pop_back();
127 return item.line;
128}
129
131 std::lock_guard<std::mutex> lock(m_mutex);
132 return m_buffer.size();
133}
134
135bool CommOutQueueSingle::push_back(const std::string& line) {
136 if (line.size() < 7) return false;
137 BufferItem item(line);
138 auto match = [&item](const BufferItem& it) { return it.type == item.type; };
139
140 std::lock_guard<std::mutex> lock(m_mutex);
141 auto found = std::find_if(m_buffer.begin(), m_buffer.end(), match);
142 if (found != m_buffer.end()) {
143 // overflow: this kind of message is still not processed. Drop it
144 m_buffer.erase(std::remove_if(found, m_buffer.end(), match),
145 m_buffer.end());
146 }
147 m_buffer.push_back(item);
148 return true;
149}
150
151bool MeasuredCommOutQueue::push_back(const std::string& line) {
152 using std::chrono::duration;
153 using std::chrono::steady_clock;
154
155 auto t1 = steady_clock::now();
156 bool ok = CommOutQueue::push_back(line);
157 msg_perf[GetNmeaType(line)].in(line.size(), ok);
158 perf.in(line.size(), ok);
159 auto t2 = steady_clock::now();
160 duration<double, std::micro> us_time = t2 - t1;
161
162 push_time = 0.95 * push_time + 0.05 * us_time.count(); // LP filter.
163 return ok;
164}
165
167 using std::chrono::duration;
168 using std::chrono::steady_clock;
169
170 auto t1 = steady_clock::now();
171 // auto msg = CommOutQueue::pop(); // We need to update the perf counters,
172 // can't just pop() here
173 std::lock_guard<std::mutex> lock(m_mutex);
174 if (m_buffer.size() <= 0)
175 throw std::underflow_error("Attempt to pop() from empty buffer");
176 auto item = m_buffer.back();
177 m_buffer.pop_back();
178 perf.out(item.line.size(), item.stamp);
179 msg_perf[item.type].out(item.line.size(), item.stamp);
180 auto t2 = steady_clock::now();
181 duration<double, std::micro> us_time = t2 - t1;
182 us_time = t2 - t1;
183
184 pop_time = 0.95 * pop_time + 0.05 * us_time.count(); // LP filter.
185 return item.line;
186}
187
188std::ostream& operator<<(std::ostream& os, const MeasuredCommOutQueue& q) {
189 os << "{";
190 os << "push_time: " << q.push_time << ", ";
191 os << "pop_time: " << q.pop_time << ", ";
192 os << "perf: " << q.perf << ", ";
193 os << "msg_perf: [";
194 for (const auto& kv : q.msg_perf) {
195 os << kv.first << ": " << kv.second << ", ";
196 }
197 os << "]";
198 os << "}";
199 return os;
200};
201
202std::ostream& operator<<(std::ostream& os, const PerfCounter& pc) {
203 os << "{";
204 os << "msgs_in: " << pc.msgs_in << ", ";
205 os << "msgs_out: " << pc.msgs_out << ", ";
206 os << "bytes_in: " << pc.bytes_in << ", ";
207 os << "bytes_out: " << pc.bytes_out << ", ";
208 os << "bps_in: " << pc.bps_in << ", ";
209 os << "mps_in: " << pc.mps_in << ", ";
210 os << "bps_out: " << pc.bps_out << ", ";
211 os << "mps_out: " << pc.mps_out << ", ";
212 os << "in_out_delay_us: " << pc.in_out_delay_us << ", ";
213 os << "overflow_msgs: " << pc.overflow_msgs << ", ";
214 os << "in_queue: " << pc.in_queue;
215 os << "}";
216 return os;
217};
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
Driver registration container, a singleton.
Enhanced logging interface on top of wx/log.h.