1#ifndef COMM__OUT_QUEUE_H__
2#define COMM__OUT_QUEUE_H__
9#include <unordered_map>
12using namespace std::literals::chrono_literals;
29 void in(
const size_t bytes,
bool ok) {
30 auto t1 = std::chrono::steady_clock::now();
31 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
32 bps_in = 0.95 * bps_in + 0.05 * bytes * 1000000 / us_time.count();
33 mps_in = 0.95 * bps_in + 0.05 * 1000000 / us_time.count();
43 void out(
const size_t bytes,
44 std::chrono::time_point<std::chrono::steady_clock> in_ts) {
45 auto t1 = std::chrono::steady_clock::now();
46 std::chrono::duration<double, std::micro> us_time = t1 - last_in;
47 bps_out = 0.95 * bps_out + 0.05 * bytes * 1000000 / us_time.count();
48 mps_out = 0.95 * bps_out + 0.05 * 1000000 / us_time.count();
50 in_out_delay_us = 0.95 * in_out_delay_us + 0.05 * us_time.count();
65 size_t in_out_delay_us;
68 std::chrono::time_point<std::chrono::steady_clock> last_in;
69 std::chrono::time_point<std::chrono::steady_clock> last_out;
72std::ostream& operator<<(std::ostream& os,
const PerfCounter& pc);
84 virtual bool push_back(
const std::string& line);
90 virtual std::string
pop();
93 virtual int size()
const;
105 std::chrono::duration<unsigned, std::milli> min_msg_gap);
130 std::chrono::time_point<std::chrono::steady_clock> stamp;
133 std::vector<BufferItem> m_buffer;
134 mutable std::mutex m_mutex;
136 using duration_ms = std::chrono::duration<unsigned, std::milli>;
137 duration_ms m_min_msg_gap;
138 bool m_overrun_reported;
139 std::set<uint64_t> m_rate_limits_logged;
149 bool push_back(
const std::string& line)
override;
157 std::chrono::duration<unsigned, std::milli> min_msg_gap)
158 :
CommOutQueue(max_buffered, min_msg_gap), push_time(0), pop_time(0) {}
163 bool push_back(
const std::string& line)
override;
165 std::string
pop()
override;
167 std::unordered_map<unsigned long, PerfCounter> msg_perf;
180 std::lock_guard<std::mutex> lock(m_mutex);
181 buff.insert(buff.begin(), line);
185 std::string
pop()
override {
186 std::lock_guard<std::mutex> lock(m_mutex);
187 if (buff.size() <= 0)
188 throw std::underflow_error(
"Attempt to pop() from empty buffer");
189 auto line = buff.back();
195 std::lock_guard<std::mutex> lock(m_mutex);
200 mutable std::mutex m_mutex;
201 std::vector<std::string> buff;
A CommOutQueue limited to one message of each kind.
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
Queue of NMEA0183 messages which only holds a limited amount of each message type.
CommOutQueue(unsigned max_buffered)
Create a buffer which stores at most max_buffered items of each message.
virtual int size() const
Return number of lines in queue.
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
Simple FIFO queue without added logic.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
int size() const override
Return number of lines in queue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
Add unit test measurements to CommOutQueue.
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.