35#if __BYTE_ORDER == __LITTLE_ENDIAN 
   36static const uint64_t kFirstFiveBytes = 0x000000ffffffffff;
 
   38static const uint64_t kFirstFiveBytes = 0xffffffffff000000;
 
   41#define PUBX 190459303248    
   42#define STALK 323401897043   
   52static inline uint64_t GetNmeaType(
const std::string& line) {
 
   54  if (line[0] == 0x5c) {  
 
   56    skipchars = line.find(
',', 1);
 
   57    if (skipchars == std::string::npos) {
 
   62  uint64_t result = *
reinterpret_cast<const uint64_t*
>(&line[skipchars]);
 
   63  uint64_t result5 = result & kFirstFiveBytes;
 
   64  if (result5 == PUBX || result5 == STALK) {
 
   75static void ReportOverrun(
const std::string& msg, 
bool overrun_reported) {
 
   76  auto& registry = CommDriverRegistry::GetInstance();
 
   82  DEBUG_LOG << 
"CommOutQueue: Overrun on: " << msg;
 
   83  if (!overrun_reported) registry.evt_comm_overrun.Notify(msg);
 
   86CommOutQueue::BufferItem::BufferItem(
const std::string& _line)
 
   87    : type(GetNmeaType(_line)),
 
   89      stamp(std::chrono::steady_clock::now()) {}
 
   91CommOutQueue::BufferItem::BufferItem(
const BufferItem& other)
 
   94      stamp(std::chrono::steady_clock::now()) {}
 
   96using duration_ms = std::chrono::duration<unsigned, std::milli>;
 
   99    : m_size(max_buffered - 1),
 
  100      m_min_msg_gap(min_msg_gap),
 
  101      m_overrun_reported(false) {
 
  102  assert(max_buffered >= 1 && 
"Illegal buffer size");
 
 
  106  if (line.size() < 7) 
return false;
 
  108  auto match = [item](
const BufferItem& it) { 
return it.type == item.type; };
 
  110  std::lock_guard<std::mutex> lock(m_mutex);
 
  111  int found = std::count_if(m_buffer.begin(), m_buffer.end(), match);
 
  113    auto it = std::find_if(m_buffer.begin(), m_buffer.end(), match);
 
  114    assert(it != m_buffer.end());
 
  115    auto timespan = item.stamp - it->stamp;
 
  116    if (timespan < m_min_msg_gap) {
 
  118      if (m_rate_limits_logged.find(item.type) != m_rate_limits_logged.end()) {
 
  119        m_rate_limits_logged.insert(item.type);
 
  120        wxLogMessage(
"Limiting output rate for %u, message: %s", item.type,
 
  125  if (found > m_size) {
 
  128    if (!m_overrun_reported) {
 
  129      ReportOverrun(line, m_overrun_reported);
 
  130      m_overrun_reported = 
true;
 
  134      return it.type == item.type && matches++ >= m_size;
 
  136    m_buffer.erase(std::remove_if(m_buffer.begin(), m_buffer.end(), match_cnt),
 
  139  m_buffer.insert(m_buffer.begin(), item);
 
 
  144  std::lock_guard<std::mutex> lock(m_mutex);
 
  146  if (m_buffer.size() <= 0)
 
  147    throw std::underflow_error(
"Attempt to pop() from empty buffer");
 
  148  auto item = m_buffer.back();
 
 
  154  std::lock_guard<std::mutex> lock(m_mutex);
 
  155  return m_buffer.size();
 
 
  159  if (line.size() < 7) 
return false;
 
  161  auto match = [&item](
const BufferItem& it) { 
return it.type == item.type; };
 
  163  std::lock_guard<std::mutex> lock(m_mutex);
 
  164  auto found = std::find_if(m_buffer.begin(), m_buffer.end(), match);
 
  165  if (found != m_buffer.end()) {
 
  167    m_buffer.erase(std::remove_if(found, m_buffer.end(), match),
 
  170  m_buffer.push_back(item);
 
 
  175  using std::chrono::duration;
 
  176  using std::chrono::steady_clock;
 
  178  auto t1 = steady_clock::now();
 
  180  msg_perf[GetNmeaType(line)].in(line.size(), ok);
 
  181  perf.in(line.size(), ok);
 
  182  auto t2 = steady_clock::now();
 
  183  duration<double, std::micro> us_time = t2 - t1;
 
  185  push_time = 0.95 * push_time + 0.05 * us_time.count();  
 
 
  190  using std::chrono::duration;
 
  191  using std::chrono::steady_clock;
 
  193  auto t1 = steady_clock::now();
 
  196  std::lock_guard<std::mutex> lock(m_mutex);
 
  197  if (m_buffer.size() <= 0)
 
  198    throw std::underflow_error(
"Attempt to pop() from empty buffer");
 
  199  auto item = m_buffer.back();
 
  201  perf.out(item.line.size(), item.stamp);
 
  202  msg_perf[item.type].out(item.line.size(), item.stamp);
 
  203  auto t2 = steady_clock::now();
 
  204  duration<double, std::micro> us_time = t2 - t1;
 
  207  pop_time = 0.95 * pop_time + 0.05 * us_time.count();  
 
 
  213  os << 
"push_time: " << q.push_time << 
", ";
 
  214  os << 
"pop_time: " << q.pop_time << 
", ";
 
  215  os << 
"perf: " << q.perf << 
", ";
 
  217  for (
const auto& kv : q.msg_perf) {
 
  218    os << kv.first << 
": " << kv.second << 
", ";
 
  225std::ostream& operator<<(std::ostream& os, 
const PerfCounter& pc) {
 
  227  os << 
"msgs_in: " << pc.msgs_in << 
", ";
 
  228  os << 
"msgs_out: " << pc.msgs_out << 
", ";
 
  229  os << 
"bytes_in: " << pc.bytes_in << 
", ";
 
  230  os << 
"bytes_out: " << pc.bytes_out << 
", ";
 
  231  os << 
"bps_in: " << pc.bps_in << 
", ";
 
  232  os << 
"mps_in: " << pc.mps_in << 
", ";
 
  233  os << 
"bps_out: " << pc.bps_out << 
", ";
 
  234  os << 
"mps_out: " << pc.mps_out << 
", ";
 
  235  os << 
"in_out_delay_us: " << pc.in_out_delay_us << 
", ";
 
  236  os << 
"overflow_msgs: " << pc.overflow_msgs << 
", ";
 
  237  os << 
"in_queue: " << pc.in_queue;
 
bool push_back(const std::string &line) override
Insert line of NMEA0183 data in buffer.
 
virtual int size() const
Return number of lines in queue.
 
virtual std::string pop()
Return next line to send and remove it from buffer, throws exception if empty.
 
CommOutQueue()
Default buffer, allows 10 buffered messages of each type, applies rate limits when repeated with less...
 
virtual bool push_back(const std::string &line)
Insert valid line of NMEA0183 data in buffer.
 
Add unit test measurements to CommOutQueue.
 
bool push_back(const std::string &line) override
Insert valid line of NMEA0183 data in buffer.
 
std::string pop() override
Return next line to send and remove it from buffer, throws exception if empty.
 
Driver registration container, a singleton.
 
Communications output queue.
 
Enhanced logging interface on top of wx/log.h.