32#include "rapidjson/document.h"
34#include "model/comm_drv_signalk_net.h"
37#include "model/geodesic.h"
39#include "wxServDisc.h"
41#include "observable.h"
43#include "ixwebsocket/IXNetSystem.h"
44#include "ixwebsocket/IXWebSocket.h"
45#include "ixwebsocket/IXUserAgent.h"
46#include "ixwebsocket/IXSocketTLSOptions.h"
47using namespace std::literals::chrono_literals;
49const int kTimerSocket = 9006;
56 const wxString& PortName,
57 const wxString& strBaudRate);
61 bool SetOutMsg(
const wxString& msg);
65 void ThreadMessage(
const wxString& msg);
66 bool OpenComPortPhysical(
const wxString& com_name,
int baud_rate);
67 void CloseComPortPhysical();
68 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
69 size_t WriteComPortPhysical(
unsigned char* msg,
size_t length);
70 void SetGatewayOperationMode(
void);
74 wxString m_FullPortName;
76 unsigned char* put_ptr;
77 unsigned char* tak_ptr;
79 unsigned char* rx_buffer;
93 : wxEvent(
id, commandType) {};
97 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
98 std::shared_ptr<std::string> GetPayload() {
return m_payload; }
101 wxEvent* Clone()
const {
103 newevent->m_payload = this->m_payload;
108 std::shared_ptr<std::string> m_payload;
116 wxEvtHandler* consumer,
const std::string& token);
117 virtual void* Entry();
122 void HandleMessage(
const std::string& message);
123 wxEvtHandler* s_wsSKConsumer;
124 wxIPV4address m_address;
125 wxEvtHandler* m_consumer;
131 mutable std::mutex m_stats_mutex;
135 wxIPV4address address, wxEvtHandler* consumer,
136 const std::string& token)
137 : m_address(address),
138 m_consumer(consumer),
139 m_parentStream(parent),
141 resume_listener.
Init(SystemEvents::GetInstance().evt_resume,
145 wxLogDebug(
"WebSocketThread: restarted");
149void* WebSocketThread::Entry() {
150 using namespace std::chrono_literals;
151 bool not_done =
true;
153 m_parentStream->SetThreadRunning(
true);
155 s_wsSKConsumer = m_consumer;
157 wxString host = m_address.IPAddress();
158 int port = m_address.Service();
161 std::stringstream wsAddress;
162 wsAddress <<
"ws://" << host <<
":" << port
163 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
164 std::stringstream wssAddress;
165 wssAddress <<
"wss://" << host <<
":" << port
166 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
168 if (!m_token.empty()) {
169 wsAddress <<
"&token=" << m_token;
170 wssAddress <<
"&token=" << m_token;
173 ws.setUrl(wssAddress.str());
174 ix::SocketTLSOptions opt;
175 opt.disable_hostname_validation =
true;
177 ws.setTLSOptions(opt);
178 ws.setPingInterval(30);
180 auto message_callback = [&](
const ix::WebSocketMessagePtr& msg) {
181 if (msg->type == ix::WebSocketMessageType::Message) {
182 HandleMessage(msg->str);
183 }
else if (msg->type == ix::WebSocketMessageType::Open) {
184 wxLogDebug(
"websocket: Connection established");
185 std::lock_guard lock(m_stats_mutex);
186 m_driver_stats.available =
true;
187 }
else if (msg->type == ix::WebSocketMessageType::Close) {
188 wxLogDebug(
"websocket: Connection disconnected");
189 std::lock_guard lock(m_stats_mutex);
190 m_driver_stats.available =
false;
191 }
else if (msg->type == ix::WebSocketMessageType::Error) {
192 std::lock_guard lock(m_stats_mutex);
194 wxLogDebug(
"websocket: error: %s", msg->errorInfo.reason.c_str());
195 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
196 : ws.setUrl(wsAddress.str());
200 ws.setOnMessageCallback(message_callback);
203 std::lock_guard lock(m_stats_mutex);
204 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
205 m_driver_stats.driver_iface = m_parentStream->m_params.GetStrippedDSPort();
206 m_driver_stats.available =
false;
211 while (m_parentStream->m_Thread_run_flag > 0) {
212 std::this_thread::sleep_for(100ms);
216 m_parentStream->SetThreadRunning(
false);
217 m_parentStream->m_Thread_run_flag = -1;
219 std::lock_guard lock(m_stats_mutex);
220 m_driver_stats.available =
false;
227 std::lock_guard lock(m_stats_mutex);
228 return m_driver_stats;
231void WebSocketThread::HandleMessage(
const std::string& message) {
232 if (s_wsSKConsumer) {
234 auto buffer = std::make_shared<std::string>(message);
236 signalKEvent.SetPayload(buffer);
237 s_wsSKConsumer->AddPendingEvent(signalKEvent);
251 m_Thread_run_flag(-1),
253 m_listener(listener),
254 m_stats_timer(*this, 2s) {
256 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
259 m_addr.Hostname(params->NetworkAddress);
260 m_addr.Service(params->NetworkPort);
261 m_token = params->AuthToken;
262 m_socketread_watchdog_timer.SetOwner(
this, kTimerSocket);
264 m_threadActive =
false;
267 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
268 m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
269 m_driver_stats.available =
false;
274CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
276DriverStats CommDriverSignalKNet::GetDriverStats()
const {
278 return m_wsThread->GetStats();
280 return m_driver_stats;
283void CommDriverSignalKNet::Open(
void) {
284 wxString discoveredIP;
291 std::string serviceIdent =
292 std::string(
"_signalk-ws._tcp.local.");
294 if (m_params->AutoSKDiscover) {
295 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
297 wxLogDebug(wxString::Format(
298 _T(
"SK server autodiscovery finds WebSocket service: %s:%d"),
299 discoveredIP.c_str(), discoveredPort));
300 m_addr.Hostname(discoveredIP);
301 m_addr.Service(discoveredPort);
306 params->NetworkAddress = discoveredIP;
307 params->NetworkPort = discoveredPort;
309 wxLogDebug(_T(
"SK server autodiscovery finds no WebSocket server."));
315void CommDriverSignalKNet::Close() { CloseWebSocket(); }
317bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
318 wxString& ip,
int& port,
int tSec) {
319 wxServDisc* servscan =
320 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
322 for (
int i = 0; i < 10; i++) {
323 if (servscan->getResultCount()) {
324 auto result = servscan->getResults().at(0);
327 wxServDisc* namescan =
new wxServDisc(0, result.name, QTYPE_SRV);
328 for (
int j = 0; j < 10; j++) {
329 if (namescan->getResultCount()) {
330 auto namescanResult = namescan->getResults().at(0);
331 port = namescanResult.port;
334 wxServDisc* addrscan =
335 new wxServDisc(0, namescanResult.name, QTYPE_A);
336 for (
int k = 0; k < 10; k++) {
337 if (addrscan->getResultCount()) {
338 auto addrscanResult = addrscan->getResults().at(0);
339 ip = addrscanResult.ip;
345 wxMilliSleep(1000 * tSec / 10);
352 wxMilliSleep(1000 * tSec / 10);
359 wxMilliSleep(1000 * tSec / 10);
367void CommDriverSignalKNet::OpenWebSocket() {
369 wxLogMessage(wxString::Format(_T(
"Opening Signal K WebSocket client: %s"),
370 m_params.GetDSPort().c_str()));
375 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
376 wxLogError(wxT(
"Can't create WebSocketThread!"));
382 GetSocketThreadWatchdogTimer()->Start(1000,
389void CommDriverSignalKNet::CloseWebSocket() {
391 if (IsThreadRunning()) {
392 wxLogMessage(_T(
"Stopping Secondary SignalK Thread"));
394 m_Thread_run_flag = 0;
396 while (IsThreadRunning() && tsec) {
402 if (m_Thread_run_flag <= 0)
403 msg.Printf(_T(
"Stopped in %d sec."), 10 - tsec);
405 msg.Printf(_T(
"Not Stopped after 10 sec."));
412 m_thread_run_flag = 0;
413 printf(
"sending delete\n");
414 m_wsThread->Delete();
418 while (IsThreadRunning() && (++nDeadman < 200)) {
421 printf(
"Closed in %d\n", nDeadman);
427void CommDriverSignalKNet::handle_SK_sentence(
429 rapidjson::Document root;
433 std::string* msg =
event.GetPayload().get();
434 std::string msgTerminated = *msg;
435 msgTerminated.append(
"\r\n");
438 if (root.HasParseError()) {
439 wxLogMessage(wxString::Format(
440 _T(
"SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
441 root.GetParseError()));
445 if (!root.IsObject()) {
446 wxLogMessage(wxString::Format(
447 _T(
"SignalKDataStream ERROR: Message is not a JSON Object: %s"),
454 if (root.HasMember(
"version")) {
455 wxString msg = _T(
"Connected to Signal K server version: ");
456 msg << (root[
"version"].GetString());
460 if (root.HasMember(
"self")) {
461 if (strncmp(root[
"self"].GetString(),
"vessels.", 8) == 0)
462 m_self = (root[
"self"].GetString());
465 m_self = std::string(
"vessels.")
466 .append(root[
"self"].GetString());
469 if (root.HasMember(
"context") && root[
"context"].IsString()) {
470 m_context = root[
"context"].GetString();
474 auto pos =
iface.find(
":");
475 std::string comm_interface =
"";
476 if (pos != std::string::npos) comm_interface =
iface.substr(pos + 1);
477 auto navmsg = std::make_shared<const SignalkMsg>(
478 m_self, m_context, msgTerminated, comm_interface);
479 m_listener.
Notify(std::move(navmsg));
482void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
484void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
487void CommDriverSignalKNet::handleUpdate(
wxJSONValue &update) {
488 wxString sfixtime =
"";
491 sfixtime = update[
"timestamp"].
AsString();
493 if (update.
HasMember(
"values") && update[
"values"].IsArray()) {
494 for (
int j = 0; j < update[
"values"].
Size(); ++j) {
496 updateItem(item, sfixtime);
501void CommDriverSignalKNet::updateItem(
wxJSONValue &item,
502 wxString &sfixtime) {
504 const wxString &update_path = item[
"path"].
AsString();
507 if (update_path == _T(
"navigation.position") && !value.IsNull()) {
508 updateNavigationPosition(value, sfixtime);
509 }
else if (update_path == _T(
"navigation.speedOverGround") &&
510 m_bGPSValid_SK && !value.IsNull()) {
511 updateNavigationSpeedOverGround(value, sfixtime);
512 }
else if (update_path == _T(
"navigation.courseOverGroundTrue") &&
513 m_bGPSValid_SK && !value.IsNull()) {
514 updateNavigationCourseOverGround(value, sfixtime);
515 }
else if (update_path == _T(
"navigation.courseOverGroundMagnetic")) {
517 else if (update_path ==
518 _T(
"navigation.gnss.satellites"))
520 updateGnssSatellites(value, sfixtime);
521 }
else if (update_path ==
522 _T(
"navigation.gnss.satellitesInView"))
524 updateGnssSatellites(value, sfixtime);
525 }
else if (update_path == _T(
"navigation.headingTrue")) {
527 updateHeadingTrue(value, sfixtime);
528 }
else if (update_path == _T(
"navigation.headingMagnetic")) {
530 updateHeadingMagnetic(value, sfixtime);
531 }
else if (update_path == _T(
"navigation.magneticVariation")) {
533 updateMagneticVariance(value, sfixtime);
540 writer.
Write(item, dbg);
541 wxString msg( _T(
"update: ") );
549void CommDriverSignalKNet::updateNavigationPosition(
551 if ((value.HasMember(
"latitude" && value[
"latitude"].IsDouble())) &&
552 (value.HasMember(
"longitude") && value[
"longitude"].IsDouble())) {
554 m_lat = value[
"latitude"].AsDouble();
555 m_lon = value[
"longitude"].AsDouble();
556 m_bGPSValid_SK =
true;
558 m_bGPSValid_SK =
false;
563void CommDriverSignalKNet::updateNavigationSpeedOverGround(
565 double sog_ms = value.AsDouble();
566 double sog_knot = sog_ms * ms_to_knot_factor;
571void CommDriverSignalKNet::updateNavigationCourseOverGround(
573 double cog_rad = value.AsDouble();
574 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
579void CommDriverSignalKNet::updateGnssSatellites(
wxJSONValue &value,
580 const wxString &sfixtime) {
583 if (value.AsInt() > 0) {
584 m_frame->setSatelitesInView(value.AsInt());
587 }
else if ((value.HasMember(
"count") && value[
"count"].IsInt())) {
588 m_frame->setSatelitesInView(value[
"count"].AsInt());
594void CommDriverSignalKNet::updateHeadingTrue(
wxJSONValue &value,
595 const wxString &sfixtime) {
596 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
599void CommDriverSignalKNet::updateHeadingMagnetic(
601 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
604void CommDriverSignalKNet::updateMagneticVariance(
606 m_var = GEODESIC_RAD2DEG(value.AsDouble());
const std::string iface
Physical device for 0183, else a unique string.
Interface implemented by transport layer and possible other parties like test code which should handl...
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Custom event class for OpenCPN's notification system.
The JSON value class implementation.
int Size() const
Return the size of the array or map stored in this value.
bool HasMember(unsigned index) const
Return TRUE if the object contains an element at the specified index.
wxString AsString() const
Return the stored value as a wxWidget's string.
The JSON document writer.
void Write(const wxJSONValue &value, wxString &str)
Write the JSONvalue object to a JSON text.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.