OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_out_queue.h
Go to the documentation of this file.
1/***************************************************************************
2 * Copyright (C) 2022 - 2024 Alec Leamas *
3 * *
4 * This program is free software; you can redistribute it and/or modify *
5 * it under the terms of the GNU General Public License as published by *
6 * the Free Software Foundation; either version 2 of the License, or *
7 * (at your option) any later version. *
8 * *
9 * This program is distributed in the hope that it will be useful, *
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
12 * GNU General Public License for more details. *
13 * *
14 * You should have received a copy of the GNU General Public License *
15 * along with this program; if not, see <https://www.gnu.org/licenses/>. *
16 **************************************************************************/
17
24#ifndef COMM_OUT_QUEUE_H_
25#define COMM_OUT_QUEUE_H_
26
27#include <chrono>
28#include <cstdint>
29#include <mutex>
30#include <set>
31#include <string>
32#include <unordered_map>
33#include <vector>
34
35using namespace std::literals::chrono_literals;
36
38public:
40 : msgs_in(0),
41 msgs_out(0),
42 bytes_in(0),
43 bytes_out(0),
44 bps_in(0),
45 mps_in(0),
46 bps_out(0),
47 mps_out(0),
48 in_out_delay_us(0),
49 overflow_msgs(0),
50 in_queue(0) {}
51
52 void in(const size_t bytes, bool ok) {
53 auto t1 = std::chrono::steady_clock::now();
54 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
55 bps_in = 0.95 * bps_in + 0.05 * bytes * 1000000 / us_time.count();
56 mps_in = 0.95 * bps_in + 0.05 * 1000000 / us_time.count();
57 msgs_in++;
58 bytes_in += bytes;
59 last_in = t1;
60 if (!ok) {
61 overflow_msgs++;
62 }
63 in_queue++;
64 }
65
66 void out(const size_t bytes,
67 std::chrono::time_point<std::chrono::steady_clock> in_ts) {
68 auto t1 = std::chrono::steady_clock::now();
69 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
70 bps_out = 0.95 * bps_out + 0.05 * bytes * 1000000 / us_time.count();
71 mps_out = 0.95 * bps_out + 0.05 * 1000000 / us_time.count();
72 us_time = t1 - in_ts;
73 in_out_delay_us = 0.95 * in_out_delay_us + 0.05 * us_time.count();
74 msgs_out++;
75 bytes_out += bytes;
76 last_out = t1;
77 in_queue--;
78 }
79
80 size_t msgs_in;
81 size_t msgs_out;
82 size_t bytes_in;
83 size_t bytes_out;
84 uint32_t bps_in;
85 double mps_in;
86 uint32_t bps_out;
87 double mps_out;
88 size_t in_out_delay_us;
89 size_t overflow_msgs;
90 size_t in_queue;
91 std::chrono::time_point<std::chrono::steady_clock> last_in;
92 std::chrono::time_point<std::chrono::steady_clock> last_out;
93};
94
95std::ostream& operator<<(std::ostream& os, const PerfCounter& pc);
96
102public:
107 virtual bool push_back(const std::string& line);
108
113 virtual std::string pop();
114
116 virtual int size() const;
117
127 CommOutQueue(unsigned max_buffered,
128 std::chrono::duration<unsigned, std::milli> min_msg_gap);
133 CommOutQueue(unsigned max_buffered) : CommOutQueue(max_buffered, 0ms) {}
134
140
141 // Disable copying and assignment
142 CommOutQueue(const CommOutQueue& other) = delete;
143 CommOutQueue& operator=(const CommOutQueue&) = delete;
144
145 virtual ~CommOutQueue() = default;
146
147protected:
148 struct BufferItem {
149 uint64_t type;
150 std::string line;
151 BufferItem(const std::string& line);
152 BufferItem(const BufferItem& other);
153 std::chrono::time_point<std::chrono::steady_clock> stamp;
154 };
155
156 std::vector<BufferItem> m_buffer;
157 mutable std::mutex m_mutex;
158 int m_size;
159 using duration_ms = std::chrono::duration<unsigned, std::milli>;
160 duration_ms m_min_msg_gap;
161 bool m_overrun_reported;
162 std::set<uint64_t> m_rate_limits_logged;
163 ;
164};
165
168public:
169 CommOutQueueSingle() : CommOutQueue(1, 0ms) {}
170
172 bool push_back(const std::string& line) override;
173};
174
178public:
179 MeasuredCommOutQueue(unsigned max_buffered,
180 std::chrono::duration<unsigned, std::milli> min_msg_gap)
181 : CommOutQueue(max_buffered, min_msg_gap), push_time(0), pop_time(0) {}
182
183 MeasuredCommOutQueue(unsigned max_buffered)
184 : MeasuredCommOutQueue(max_buffered, 0ms) {}
185
186 bool push_back(const std::string& line) override;
187
188 std::string pop() override;
189
190 std::unordered_map<unsigned long, PerfCounter> msg_perf;
191
192 PerfCounter perf;
193 double push_time;
194 double pop_time;
195};
196
199public:
201
202 bool push_back(const std::string& line) override {
203 std::lock_guard<std::mutex> lock(m_mutex);
204 buff.insert(buff.begin(), line);
205 return true;
206 }
207
208 std::string pop() override {
209 std::lock_guard<std::mutex> lock(m_mutex);
210 if (buff.size() <= 0)
211 throw std::underflow_error("Attempt to pop() from empty buffer");
212 auto line = buff.back();
213 buff.pop_back();
214 return line;
215 }
216
217 int size() const override {
218 std::lock_guard<std::mutex> lock(m_mutex);
219 return buff.size();
220 }
221
222private:
223 mutable std::mutex m_mutex;
224 std::vector<std::string> buff;
225};
226
227std::ostream& operator<<(std::ostream& os, const MeasuredCommOutQueue& q);
228
229#endif // COMM_OUT_QUEUE_H_
A CommOutQueue limited to one message of each kind.
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
Queue of NMEA0183 messages which only holds a limited amount of each message type.
CommOutQueue(unsigned max_buffered)
Create a buffer which stores at most max_buffered items of each message.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Simple FIFO queue without added logic.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
int size() const override
Return number of lines in queue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.