31#include "rapidjson/document.h" 
   36#include "model/geodesic.h" 
   38#include "wxServDisc.h" 
   42#include "ixwebsocket/IXNetSystem.h" 
   43#include "ixwebsocket/IXWebSocket.h" 
   44#include "ixwebsocket/IXUserAgent.h" 
   45#include "ixwebsocket/IXSocketTLSOptions.h" 
   47using namespace std::literals::chrono_literals;
 
   49static const int kTimerSocket = 9006;
 
   56                             const wxString& PortName,
 
   57                             const wxString& strBaudRate);
 
   61  bool SetOutMsg(
const wxString& msg);
 
   65  void ThreadMessage(
const wxString& msg);
 
   66  bool OpenComPortPhysical(
const wxString& com_name, 
int baud_rate);
 
   67  void CloseComPortPhysical();
 
   68  size_t WriteComPortPhysical(std::vector<unsigned char> msg);
 
   69  size_t WriteComPortPhysical(
unsigned char* msg, 
size_t length);
 
   70  void SetGatewayOperationMode();
 
   74  wxString m_FullPortName;
 
   76  unsigned char* put_ptr;
 
   77  unsigned char* tak_ptr;
 
   79  unsigned char* rx_buffer;
 
 
   93      : wxEvent(
id, commandType) {};
 
   97  void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
 
   98  std::shared_ptr<std::string> GetPayload() { 
return m_payload; }
 
  101  wxEvent* Clone()
 const {
 
  103    newevent->m_payload = this->m_payload;
 
  108  std::shared_ptr<std::string> m_payload;
 
 
  116                  wxEvtHandler* consumer, 
const std::string& token);
 
  117  virtual void* Entry();
 
  122  void HandleMessage(
const std::string& message);
 
  123  wxEvtHandler* s_wsSKConsumer;
 
  124  wxIPV4address m_address;
 
  125  wxEvtHandler* m_consumer;
 
  131  mutable std::mutex m_stats_mutex;
 
 
  135                                 wxIPV4address address, wxEvtHandler* consumer,
 
  136                                 const std::string& token)
 
  137    : m_address(address),
 
  138      m_consumer(consumer),
 
  139      m_parentStream(parent),
 
  141  resume_listener.
Init(SystemEvents::GetInstance().evt_resume,
 
  145                         wxLogDebug(
"WebSocketThread: restarted");
 
  149void* WebSocketThread::Entry() {
 
  150  using namespace std::chrono_literals;
 
  151  bool not_done = 
true;
 
  153  m_parentStream->SetThreadRunning(
true);
 
  155  s_wsSKConsumer = m_consumer;
 
  157  wxString host = m_address.IPAddress();
 
  158  int port = m_address.Service();
 
  161  std::stringstream wsAddress;
 
  162  wsAddress << 
"ws://" << host << 
":" << port
 
  163            << 
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
 
  164  std::stringstream wssAddress;
 
  165  wssAddress << 
"wss://" << host << 
":" << port
 
  166             << 
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
 
  168  if (!m_token.empty()) {
 
  169    wsAddress << 
"&token=" << m_token;
 
  170    wssAddress << 
"&token=" << m_token;
 
  173  ws.setUrl(wssAddress.str());
 
  174  ix::SocketTLSOptions opt;
 
  175  opt.disable_hostname_validation = 
true;
 
  177  ws.setTLSOptions(opt);
 
  178  ws.setPingInterval(30);
 
  180  auto message_callback = [&](
const ix::WebSocketMessagePtr& msg) {
 
  181    if (msg->type == ix::WebSocketMessageType::Message) {
 
  182      HandleMessage(msg->str);
 
  183    } 
else if (msg->type == ix::WebSocketMessageType::Open) {
 
  184      wxLogDebug(
"websocket: Connection established");
 
  185      std::lock_guard lock(m_stats_mutex);
 
  186      m_driver_stats.available = 
true;
 
  187    } 
else if (msg->type == ix::WebSocketMessageType::Close) {
 
  188      wxLogDebug(
"websocket: Connection disconnected");
 
  189      std::lock_guard lock(m_stats_mutex);
 
  190      m_driver_stats.available = 
false;
 
  191    } 
else if (msg->type == ix::WebSocketMessageType::Error) {
 
  192      std::lock_guard lock(m_stats_mutex);
 
  194      wxLogDebug(
"websocket: error: %s", msg->errorInfo.reason.c_str());
 
  195      ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
 
  196                                     : ws.setUrl(wsAddress.str());
 
  200  ws.setOnMessageCallback(message_callback);
 
  203    std::lock_guard lock(m_stats_mutex);
 
  204    m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
 
  205    m_driver_stats.driver_iface = m_parentStream->m_params.GetStrippedDSPort();
 
  206    m_driver_stats.available = 
false;
 
  211  while (m_parentStream->m_Thread_run_flag > 0) {
 
  212    std::this_thread::sleep_for(100ms);
 
  216  m_parentStream->SetThreadRunning(
false);
 
  217  m_parentStream->m_Thread_run_flag = -1;
 
  219    std::lock_guard lock(m_stats_mutex);
 
  220    m_driver_stats.available = 
false;
 
  227  std::lock_guard lock(m_stats_mutex);
 
  228  return m_driver_stats;
 
  231void WebSocketThread::HandleMessage(
const std::string& message) {
 
  232  if (s_wsSKConsumer) {
 
  234    auto buffer = std::make_shared<std::string>(message);
 
  236    signalKEvent.SetPayload(buffer);
 
  237    s_wsSKConsumer->AddPendingEvent(signalKEvent);
 
  251      m_Thread_run_flag(-1),
 
  253      m_listener(listener),
 
  254      m_stats_timer(*this, 2s) {
 
  256  Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
 
  259  m_addr.Hostname(params->NetworkAddress);
 
  260  m_addr.Service(params->NetworkPort);
 
  261  m_token = params->AuthToken;
 
  262  m_socketread_watchdog_timer.SetOwner(
this, kTimerSocket);
 
  264  m_threadActive = 
false;
 
  267  m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
 
  268  m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
 
  269  m_driver_stats.available = 
false;
 
  274CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
 
  278    return m_wsThread->GetStats();
 
  280    return m_driver_stats;
 
 
  283void CommDriverSignalKNet::Open() {
 
  284  wxString discoveredIP;
 
  291    std::string serviceIdent =
 
  292        std::string(
"_signalk-ws._tcp.local.");  
 
  296void CommDriverSignalKNet::Close() { CloseWebSocket(); }
 
  298bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
 
  299                                            wxString& ip, 
int& port, 
int tSec) {
 
  300  wxServDisc* servscan =
 
  301      new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
 
  303  for (
int i = 0; i < 10; i++) {
 
  304    if (servscan->getResultCount()) {
 
  305      auto result = servscan->getResults().at(0);
 
  308      wxServDisc* namescan = 
new wxServDisc(0, result.name, QTYPE_SRV);
 
  309      for (
int j = 0; j < 10; j++) {
 
  310        if (namescan->getResultCount()) {
 
  311          auto namescanResult = namescan->getResults().at(0);
 
  312          port = namescanResult.port;
 
  315          wxServDisc* addrscan =
 
  316              new wxServDisc(0, namescanResult.name, QTYPE_A);
 
  317          for (
int k = 0; k < 10; k++) {
 
  318            if (addrscan->getResultCount()) {
 
  319              auto addrscanResult = addrscan->getResults().at(0);
 
  320              ip = addrscanResult.ip;
 
  326              wxMilliSleep(1000 * tSec / 10);
 
  333          wxMilliSleep(1000 * tSec / 10);
 
  340      wxMilliSleep(1000 * tSec / 10);
 
  348void CommDriverSignalKNet::OpenWebSocket() {
 
  350  wxLogMessage(wxString::Format(
"Opening Signal K WebSocket client: %s",
 
  351                                m_params.GetDSPort().c_str()));
 
  356  if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
 
  357    wxLogError(
"Can't create WebSocketThread!");
 
  363  GetSocketThreadWatchdogTimer()->Start(1000,
 
  370void CommDriverSignalKNet::CloseWebSocket() {
 
  372    if (IsThreadRunning()) {
 
  373      wxLogMessage(
"Stopping Secondary SignalK Thread");
 
  374      m_stats_timer.Stop();
 
  376      m_Thread_run_flag = 0;
 
  378      while (IsThreadRunning() && tsec) {
 
  384      if (m_Thread_run_flag <= 0)
 
  385        msg.Printf(
"Stopped in %d sec.", 10 - tsec);
 
  387        msg.Printf(
"Not Stopped after 10 sec.");
 
  395void CommDriverSignalKNet::handle_SK_sentence(
 
  397  rapidjson::Document root;
 
  401  std::string* msg = 
event.GetPayload().get();
 
  402  std::string msgTerminated = *msg;
 
  403  msgTerminated.append(
"\r\n");
 
  406  if (root.HasParseError()) {
 
  407    wxLogMessage(wxString::Format(
 
  408        "SignalKDataStream ERROR: the JSON document is not well-formed:%d",
 
  409        root.GetParseError()));
 
  413  if (!root.IsObject()) {
 
  414    wxLogMessage(wxString::Format(
 
  415        "SignalKDataStream ERROR: Message is not a JSON Object: %s",
 
  422  if (root.HasMember(
"version")) {
 
  423    wxString msg = 
"Connected to Signal K server version: ";
 
  424    msg << (root[
"version"].GetString());
 
  428  if (root.HasMember(
"self")) {
 
  429    if (strncmp(root[
"self"].GetString(), 
"vessels.", 8) == 0)
 
  430      m_self = (root[
"self"].GetString());  
 
  433      m_self = std::string(
"vessels.")
 
  434                   .append(root[
"self"].GetString());  
 
  437  if (root.HasMember(
"context") && root[
"context"].IsString()) {
 
  438    m_context = root[
"context"].GetString();
 
  442  auto pos = 
iface.find(
":");
 
  443  std::string comm_interface = 
"";
 
  444  if (pos != std::string::npos) comm_interface = 
iface.substr(pos + 1);
 
  445  auto navmsg = std::make_shared<const SignalkMsg>(
 
  446      m_self, m_context, msgTerminated, comm_interface);
 
  447  m_listener.
Notify(std::move(navmsg));
 
  450void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
 
  452void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
 
const std::string iface
Physical device for 0183, else a unique string.
DriverStats GetDriverStats() const override
Get the Driver Statistics.
Interface for handling incoming messages.
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
void Init(const KeyProvider &kp, const std::function< void(ObservedEvt &ev)> &action)
Initiate an object yet not listening.
Custom event class for OpenCPN's notification system.
Driver registration container, a singleton.
SignalK IP network driver.
Raw messages layer, supports sending and recieving navmsg messages.
General observable implementation with several specializations.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.