31#include "rapidjson/document.h"
36#include "model/geodesic.h"
38#include "wxServDisc.h"
42#include "ixwebsocket/IXNetSystem.h"
43#include "ixwebsocket/IXWebSocket.h"
44#include "ixwebsocket/IXUserAgent.h"
45#include "ixwebsocket/IXSocketTLSOptions.h"
47using namespace std::literals::chrono_literals;
49static const int kTimerSocket = 9006;
56 const wxString& PortName,
57 const wxString& strBaudRate);
61 bool SetOutMsg(
const wxString& msg);
65 void ThreadMessage(
const wxString& msg);
66 bool OpenComPortPhysical(
const wxString& com_name,
int baud_rate);
67 void CloseComPortPhysical();
68 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
69 size_t WriteComPortPhysical(
unsigned char* msg,
size_t length);
70 void SetGatewayOperationMode();
74 wxString m_FullPortName;
76 unsigned char* put_ptr;
77 unsigned char* tak_ptr;
79 unsigned char* rx_buffer;
93 : wxEvent(
id, commandType) {};
97 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
98 std::shared_ptr<std::string> GetPayload() {
return m_payload; }
101 wxEvent* Clone()
const {
103 newevent->m_payload = this->m_payload;
108 std::shared_ptr<std::string> m_payload;
116 wxEvtHandler* consumer,
const std::string& token);
117 virtual void* Entry();
122 void HandleMessage(
const std::string& message);
123 wxEvtHandler* s_wsSKConsumer;
124 wxIPV4address m_address;
125 wxEvtHandler* m_consumer;
131 mutable std::mutex m_stats_mutex;
135 wxIPV4address address, wxEvtHandler* consumer,
136 const std::string& token)
137 : m_address(address),
138 m_consumer(consumer),
139 m_parentStream(parent),
141 resume_listener.
Init(SystemEvents::GetInstance().evt_resume,
145 wxLogDebug(
"WebSocketThread: restarted");
149void* WebSocketThread::Entry() {
150 using namespace std::chrono_literals;
151 bool not_done =
true;
153 m_parentStream->SetThreadRunning(
true);
155 s_wsSKConsumer = m_consumer;
157 wxString host = m_address.IPAddress();
158 int port = m_address.Service();
161 std::stringstream wsAddress;
162 wsAddress <<
"ws://" << host <<
":" << port
163 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
164 std::stringstream wssAddress;
165 wssAddress <<
"wss://" << host <<
":" << port
166 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
168 if (!m_token.empty()) {
169 wsAddress <<
"&token=" << m_token;
170 wssAddress <<
"&token=" << m_token;
173 ws.setUrl(wssAddress.str());
174 ix::SocketTLSOptions opt;
175 opt.disable_hostname_validation =
true;
177 ws.setTLSOptions(opt);
178 ws.setPingInterval(30);
180 auto message_callback = [&](
const ix::WebSocketMessagePtr& msg) {
181 if (msg->type == ix::WebSocketMessageType::Message) {
182 HandleMessage(msg->str);
183 }
else if (msg->type == ix::WebSocketMessageType::Open) {
184 wxLogDebug(
"websocket: Connection established");
185 std::lock_guard lock(m_stats_mutex);
186 m_driver_stats.available =
true;
187 }
else if (msg->type == ix::WebSocketMessageType::Close) {
188 wxLogDebug(
"websocket: Connection disconnected");
189 std::lock_guard lock(m_stats_mutex);
190 m_driver_stats.available =
false;
191 }
else if (msg->type == ix::WebSocketMessageType::Error) {
192 std::lock_guard lock(m_stats_mutex);
194 wxLogDebug(
"websocket: error: %s", msg->errorInfo.reason.c_str());
195 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
196 : ws.setUrl(wsAddress.str());
200 ws.setOnMessageCallback(message_callback);
203 std::lock_guard lock(m_stats_mutex);
204 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
205 m_driver_stats.driver_iface = m_parentStream->m_params.GetStrippedDSPort();
206 m_driver_stats.available =
false;
211 while (m_parentStream->m_Thread_run_flag > 0) {
212 std::this_thread::sleep_for(100ms);
216 m_parentStream->SetThreadRunning(
false);
217 m_parentStream->m_Thread_run_flag = -1;
219 std::lock_guard lock(m_stats_mutex);
220 m_driver_stats.available =
false;
227 std::lock_guard lock(m_stats_mutex);
228 return m_driver_stats;
231void WebSocketThread::HandleMessage(
const std::string& message) {
232 if (s_wsSKConsumer) {
234 auto buffer = std::make_shared<std::string>(message);
236 signalKEvent.SetPayload(buffer);
237 s_wsSKConsumer->AddPendingEvent(signalKEvent);
251 m_Thread_run_flag(-1),
253 m_listener(listener),
254 m_stats_timer(*this, 2s) {
256 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
259 m_addr.Hostname(params->NetworkAddress);
260 m_addr.Service(params->NetworkPort);
261 m_token = params->AuthToken;
262 m_socketread_watchdog_timer.SetOwner(
this, kTimerSocket);
264 m_threadActive =
false;
267 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
268 m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
269 m_driver_stats.available =
false;
274CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
278 return m_wsThread->GetStats();
280 return m_driver_stats;
283void CommDriverSignalKNet::Open() {
284 wxString discoveredIP;
291 std::string serviceIdent =
292 std::string(
"_signalk-ws._tcp.local.");
296void CommDriverSignalKNet::Close() { CloseWebSocket(); }
298bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
299 wxString& ip,
int& port,
int tSec) {
300 wxServDisc* servscan =
301 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
303 for (
int i = 0; i < 10; i++) {
304 if (servscan->getResultCount()) {
305 auto result = servscan->getResults().at(0);
308 wxServDisc* namescan =
new wxServDisc(0, result.name, QTYPE_SRV);
309 for (
int j = 0; j < 10; j++) {
310 if (namescan->getResultCount()) {
311 auto namescanResult = namescan->getResults().at(0);
312 port = namescanResult.port;
315 wxServDisc* addrscan =
316 new wxServDisc(0, namescanResult.name, QTYPE_A);
317 for (
int k = 0; k < 10; k++) {
318 if (addrscan->getResultCount()) {
319 auto addrscanResult = addrscan->getResults().at(0);
320 ip = addrscanResult.ip;
326 wxMilliSleep(1000 * tSec / 10);
333 wxMilliSleep(1000 * tSec / 10);
340 wxMilliSleep(1000 * tSec / 10);
348void CommDriverSignalKNet::OpenWebSocket() {
350 wxLogMessage(wxString::Format(
"Opening Signal K WebSocket client: %s",
351 m_params.GetDSPort().c_str()));
356 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
357 wxLogError(
"Can't create WebSocketThread!");
363 GetSocketThreadWatchdogTimer()->Start(1000,
370void CommDriverSignalKNet::CloseWebSocket() {
372 if (IsThreadRunning()) {
373 wxLogMessage(
"Stopping Secondary SignalK Thread");
374 m_stats_timer.Stop();
376 m_Thread_run_flag = 0;
378 while (IsThreadRunning() && tsec) {
384 if (m_Thread_run_flag <= 0)
385 msg.Printf(
"Stopped in %d sec.", 10 - tsec);
387 msg.Printf(
"Not Stopped after 10 sec.");
395void CommDriverSignalKNet::handle_SK_sentence(
397 rapidjson::Document root;
401 std::string* msg =
event.GetPayload().get();
402 std::string msgTerminated = *msg;
403 msgTerminated.append(
"\r\n");
406 if (root.HasParseError()) {
407 wxLogMessage(wxString::Format(
408 "SignalKDataStream ERROR: the JSON document is not well-formed:%d",
409 root.GetParseError()));
413 if (!root.IsObject()) {
414 wxLogMessage(wxString::Format(
415 "SignalKDataStream ERROR: Message is not a JSON Object: %s",
422 if (root.HasMember(
"version")) {
423 wxString msg =
"Connected to Signal K server version: ";
424 msg << (root[
"version"].GetString());
428 if (root.HasMember(
"self")) {
429 if (strncmp(root[
"self"].GetString(),
"vessels.", 8) == 0)
430 m_self = (root[
"self"].GetString());
433 m_self = std::string(
"vessels.")
434 .append(root[
"self"].GetString());
437 if (root.HasMember(
"context") && root[
"context"].IsString()) {
438 m_context = root[
"context"].GetString();
442 auto pos =
iface.find(
":");
443 std::string comm_interface =
"";
444 if (pos != std::string::npos) comm_interface =
iface.substr(pos + 1);
445 auto navmsg = std::make_shared<const SignalkMsg>(
446 m_self, m_context, msgTerminated, comm_interface);
447 m_listener.
Notify(std::move(navmsg));
450void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
452void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
const std::string iface
Physical device for 0183, else a unique string.
DriverStats GetDriverStats() const override
Get the Driver Statistics.
Interface for handling incoming messages.
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
void Init(const KeyProvider &kp, const std::function< void(ObservedEvt &ev)> &action)
Initiate an object yet not listening.
Custom event class for OpenCPN's notification system.
Driver registration container, a singleton.
SignalK IP network driver.
Raw messages layer, supports sending and recieving navmsg messages.
General observable implementation with several specializations.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.