24#ifndef COMM_OUT_QUEUE_H_
25#define COMM_OUT_QUEUE_H_
32#include <unordered_map>
35using namespace std::literals::chrono_literals;
52 void in(
const size_t bytes,
bool ok) {
53 auto t1 = std::chrono::steady_clock::now();
54 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
55 bps_in = 0.95 * bps_in + 0.05 * bytes * 1000000 / us_time.count();
56 mps_in = 0.95 * bps_in + 0.05 * 1000000 / us_time.count();
66 void out(
const size_t bytes,
67 std::chrono::time_point<std::chrono::steady_clock> in_ts) {
68 auto t1 = std::chrono::steady_clock::now();
69 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
70 bps_out = 0.95 * bps_out + 0.05 * bytes * 1000000 / us_time.count();
71 mps_out = 0.95 * bps_out + 0.05 * 1000000 / us_time.count();
73 in_out_delay_us = 0.95 * in_out_delay_us + 0.05 * us_time.count();
88 size_t in_out_delay_us;
91 std::chrono::time_point<std::chrono::steady_clock> last_in;
92 std::chrono::time_point<std::chrono::steady_clock> last_out;
95std::ostream& operator<<(std::ostream& os,
const PerfCounter& pc);
107 virtual bool push_back(
const std::string& line);
113 virtual std::string
pop();
116 virtual int size()
const;
128 std::chrono::duration<unsigned, std::milli> min_msg_gap);
153 std::chrono::time_point<std::chrono::steady_clock> stamp;
156 std::vector<BufferItem> m_buffer;
157 mutable std::mutex m_mutex;
159 using duration_ms = std::chrono::duration<unsigned, std::milli>;
160 duration_ms m_min_msg_gap;
161 bool m_overrun_reported;
162 std::set<uint64_t> m_rate_limits_logged;
172 bool push_back(
const std::string& line)
override;
180 std::chrono::duration<unsigned, std::milli> min_msg_gap)
181 :
CommOutQueue(max_buffered, min_msg_gap), push_time(0), pop_time(0) {}
186 bool push_back(
const std::string& line)
override;
188 std::string
pop()
override;
190 std::unordered_map<unsigned long, PerfCounter> msg_perf;
203 std::lock_guard<std::mutex> lock(m_mutex);
204 buff.insert(buff.begin(), line);
208 std::string
pop()
override {
209 std::lock_guard<std::mutex> lock(m_mutex);
210 if (buff.size() <= 0)
211 throw std::underflow_error(
"Attempt to pop() from empty buffer");
212 auto line = buff.back();
218 std::lock_guard<std::mutex> lock(m_mutex);
223 mutable std::mutex m_mutex;
224 std::vector<std::string> buff;
A CommOutQueue limited to one message of each kind.
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
Queue of NMEA0183 messages which only holds a limited amount of each message type.
CommOutQueue(unsigned max_buffered)
Create a buffer which stores at most max_buffered items of each message.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Simple FIFO queue without added logic.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
int size() const override
Return number of lines in queue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.