OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_out_queue.h
1#ifndef COMM__OUT_QUEUE_H__
2#define COMM__OUT_QUEUE_H__
3
4#include <chrono>
5#include <cstdint>
6#include <mutex>
7#include <set>
8#include <string>
9#include <unordered_map>
10#include <vector>
11
12using namespace std::literals::chrono_literals;
13
15public:
17 : msgs_in(0),
18 msgs_out(0),
19 bytes_in(0),
20 bytes_out(0),
21 bps_in(0),
22 mps_in(0),
23 bps_out(0),
24 mps_out(0),
25 in_out_delay_us(0),
26 overflow_msgs(0),
27 in_queue(0) {}
28
29 void in(const size_t bytes, bool ok) {
30 auto t1 = std::chrono::steady_clock::now();
31 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
32 bps_in = 0.95 * bps_in + 0.05 * bytes * 1000000 / us_time.count();
33 mps_in = 0.95 * bps_in + 0.05 * 1000000 / us_time.count();
34 msgs_in++;
35 bytes_in += bytes;
36 last_in = t1;
37 if (!ok) {
38 overflow_msgs++;
39 }
40 in_queue++;
41 }
42
43 void out(const size_t bytes,
44 std::chrono::time_point<std::chrono::steady_clock> in_ts) {
45 auto t1 = std::chrono::steady_clock::now();
46 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
47 bps_out = 0.95 * bps_out + 0.05 * bytes * 1000000 / us_time.count();
48 mps_out = 0.95 * bps_out + 0.05 * 1000000 / us_time.count();
49 us_time = t1 - in_ts;
50 in_out_delay_us = 0.95 * in_out_delay_us + 0.05 * us_time.count();
51 msgs_out++;
52 bytes_out += bytes;
53 last_out = t1;
54 in_queue--;
55 }
56
57 size_t msgs_in;
58 size_t msgs_out;
59 size_t bytes_in;
60 size_t bytes_out;
61 uint32_t bps_in;
62 double mps_in;
63 uint32_t bps_out;
64 double mps_out;
65 size_t in_out_delay_us;
66 size_t overflow_msgs;
67 size_t in_queue;
68 std::chrono::time_point<std::chrono::steady_clock> last_in;
69 std::chrono::time_point<std::chrono::steady_clock> last_out;
70};
71
72std::ostream& operator<<(std::ostream& os, const PerfCounter& pc);
73
79public:
84 virtual bool push_back(const std::string& line);
85
90 virtual std::string pop();
91
93 virtual int size() const;
94
104 CommOutQueue(unsigned max_buffered,
105 std::chrono::duration<unsigned, std::milli> min_msg_gap);
110 CommOutQueue(unsigned max_buffered) : CommOutQueue(max_buffered, 0ms) {}
111
117
118 // Disable copying and assignment
119 CommOutQueue(const CommOutQueue& other) = delete;
120 CommOutQueue& operator=(const CommOutQueue&) = delete;
121
122 virtual ~CommOutQueue() = default;
123
124protected:
125 struct BufferItem {
126 uint64_t type;
127 std::string line;
128 BufferItem(const std::string& line);
129 BufferItem(const BufferItem& other);
130 std::chrono::time_point<std::chrono::steady_clock> stamp;
131 };
132
133 std::vector<BufferItem> m_buffer;
134 mutable std::mutex m_mutex;
135 int m_size;
136 using duration_ms = std::chrono::duration<unsigned, std::milli>;
137 duration_ms m_min_msg_gap;
138 bool m_overrun_reported;
139 std::set<uint64_t> m_rate_limits_logged;
140 ;
141};
142
145public:
146 CommOutQueueSingle() : CommOutQueue(1, 0ms) {}
147
149 bool push_back(const std::string& line) override;
150};
151
155public:
156 MeasuredCommOutQueue(unsigned max_buffered,
157 std::chrono::duration<unsigned, std::milli> min_msg_gap)
158 : CommOutQueue(max_buffered, min_msg_gap), push_time(0), pop_time(0) {}
159
160 MeasuredCommOutQueue(unsigned max_buffered)
161 : MeasuredCommOutQueue(max_buffered, 0ms) {}
162
163 bool push_back(const std::string& line) override;
164
165 std::string pop() override;
166
167 std::unordered_map<unsigned long, PerfCounter> msg_perf;
168
169 PerfCounter perf;
170 double push_time;
171 double pop_time;
172};
173
176public:
178
179 bool push_back(const std::string& line) override {
180 std::lock_guard<std::mutex> lock(m_mutex);
181 buff.insert(buff.begin(), line);
182 return true;
183 }
184
185 std::string pop() override {
186 std::lock_guard<std::mutex> lock(m_mutex);
187 if (buff.size() <= 0)
188 throw std::underflow_error("Attempt to pop() from empty buffer");
189 auto line = buff.back();
190 buff.pop_back();
191 return line;
192 }
193
194 int size() const override {
195 std::lock_guard<std::mutex> lock(m_mutex);
196 return buff.size();
197 }
198
199private:
200 mutable std::mutex m_mutex;
201 std::vector<std::string> buff;
202};
203
204std::ostream& operator<<(std::ostream& os, const MeasuredCommOutQueue& q);
205
206#endif // COMM__OUT_QUEUE_H__
A CommOutQueue limited to one message of each kind.
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
Queue of NMEA0183 messages which only holds a limited amount of each message type.
CommOutQueue(unsigned max_buffered)
Create a buffer which stores at most max_buffered items of each message.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Simple FIFO queue without added logic.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
int size() const override
Return number of lines in queue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.