32#include "rapidjson/document.h"
34#include "model/comm_drv_signalk_net.h"
37#include "model/geodesic.h"
39#include "wxServDisc.h"
41#include "observable.h"
43#include "ixwebsocket/IXNetSystem.h"
44#include "ixwebsocket/IXWebSocket.h"
45#include "ixwebsocket/IXUserAgent.h"
46#include "ixwebsocket/IXSocketTLSOptions.h"
47using namespace std::literals::chrono_literals;
49const int kTimerSocket = 9006;
56 const wxString& PortName,
57 const wxString& strBaudRate);
61 bool SetOutMsg(
const wxString& msg);
65 void ThreadMessage(
const wxString& msg);
66 bool OpenComPortPhysical(
const wxString& com_name,
int baud_rate);
67 void CloseComPortPhysical();
68 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
69 size_t WriteComPortPhysical(
unsigned char* msg,
size_t length);
70 void SetGatewayOperationMode(
void);
74 wxString m_FullPortName;
76 unsigned char* put_ptr;
77 unsigned char* tak_ptr;
79 unsigned char* rx_buffer;
93 : wxEvent(
id, commandType) {};
97 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
98 std::shared_ptr<std::string> GetPayload() {
return m_payload; }
101 wxEvent* Clone()
const {
103 newevent->m_payload = this->m_payload;
108 std::shared_ptr<std::string> m_payload;
116 wxEvtHandler* consumer,
const std::string& token);
117 virtual void* Entry();
122 void HandleMessage(
const std::string& message);
123 wxEvtHandler* s_wsSKConsumer;
124 wxIPV4address m_address;
125 wxEvtHandler* m_consumer;
131 mutable std::mutex m_stats_mutex;
135 wxIPV4address address, wxEvtHandler* consumer,
136 const std::string& token)
137 : m_address(address),
138 m_consumer(consumer),
139 m_parentStream(parent),
141 resume_listener.
Init(SystemEvents::GetInstance().evt_resume,
145 wxLogDebug(
"WebSocketThread: restarted");
149void* WebSocketThread::Entry() {
150 using namespace std::chrono_literals;
151 bool not_done =
true;
153 m_parentStream->SetThreadRunning(
true);
155 s_wsSKConsumer = m_consumer;
157 wxString host = m_address.IPAddress();
158 int port = m_address.Service();
161 std::stringstream wsAddress;
162 wsAddress <<
"ws://" << host <<
":" << port
163 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
164 std::stringstream wssAddress;
165 wssAddress <<
"wss://" << host <<
":" << port
166 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
168 if (!m_token.empty()) {
169 wsAddress <<
"&token=" << m_token;
170 wssAddress <<
"&token=" << m_token;
173 ws.setUrl(wssAddress.str());
174 ix::SocketTLSOptions opt;
175 opt.disable_hostname_validation =
true;
177 ws.setTLSOptions(opt);
178 ws.setPingInterval(30);
180 auto message_callback = [&](
const ix::WebSocketMessagePtr& msg) {
181 if (msg->type == ix::WebSocketMessageType::Message) {
182 HandleMessage(msg->str);
183 }
else if (msg->type == ix::WebSocketMessageType::Open) {
184 wxLogDebug(
"websocket: Connection established");
185 std::lock_guard lock(m_stats_mutex);
186 m_driver_stats.available =
true;
187 }
else if (msg->type == ix::WebSocketMessageType::Close) {
188 wxLogDebug(
"websocket: Connection disconnected");
189 std::lock_guard lock(m_stats_mutex);
190 m_driver_stats.available =
false;
191 }
else if (msg->type == ix::WebSocketMessageType::Error) {
192 std::lock_guard lock(m_stats_mutex);
194 wxLogDebug(
"websocket: error: %s", msg->errorInfo.reason.c_str());
195 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
196 : ws.setUrl(wsAddress.str());
200 ws.setOnMessageCallback(message_callback);
203 std::lock_guard lock(m_stats_mutex);
204 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
205 m_driver_stats.driver_iface = m_parentStream->m_params.GetStrippedDSPort();
206 m_driver_stats.available =
false;
211 while (m_parentStream->m_Thread_run_flag > 0) {
212 std::this_thread::sleep_for(100ms);
216 m_parentStream->SetThreadRunning(
false);
217 m_parentStream->m_Thread_run_flag = -1;
219 std::lock_guard lock(m_stats_mutex);
220 m_driver_stats.available =
false;
227 std::lock_guard lock(m_stats_mutex);
228 return m_driver_stats;
231void WebSocketThread::HandleMessage(
const std::string& message) {
232 if (s_wsSKConsumer) {
234 auto buffer = std::make_shared<std::string>(message);
236 signalKEvent.SetPayload(buffer);
237 s_wsSKConsumer->AddPendingEvent(signalKEvent);
251 m_Thread_run_flag(-1),
253 m_listener(listener),
254 m_stats_timer(*this, 2s) {
256 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
259 m_addr.Hostname(params->NetworkAddress);
260 m_addr.Service(params->NetworkPort);
261 m_token = params->AuthToken;
262 m_socketread_watchdog_timer.SetOwner(
this, kTimerSocket);
264 m_threadActive =
false;
267 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
268 m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
269 m_driver_stats.available =
false;
274CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
278 return m_wsThread->GetStats();
280 return m_driver_stats;
283void CommDriverSignalKNet::Open(
void) {
284 wxString discoveredIP;
291 std::string serviceIdent =
292 std::string(
"_signalk-ws._tcp.local.");
294 if (m_params->AutoSKDiscover) {
295 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
297 wxLogDebug(wxString::Format(
298 _T(
"SK server autodiscovery finds WebSocket service: %s:%d"),
299 discoveredIP.c_str(), discoveredPort));
300 m_addr.Hostname(discoveredIP);
301 m_addr.Service(discoveredPort);
306 params->NetworkAddress = discoveredIP;
307 params->NetworkPort = discoveredPort;
309 wxLogDebug(_T(
"SK server autodiscovery finds no WebSocket server."));
315void CommDriverSignalKNet::Close() { CloseWebSocket(); }
317bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
318 wxString& ip,
int& port,
int tSec) {
319 wxServDisc* servscan =
320 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
322 for (
int i = 0; i < 10; i++) {
323 if (servscan->getResultCount()) {
324 auto result = servscan->getResults().at(0);
327 wxServDisc* namescan =
new wxServDisc(0, result.name, QTYPE_SRV);
328 for (
int j = 0; j < 10; j++) {
329 if (namescan->getResultCount()) {
330 auto namescanResult = namescan->getResults().at(0);
331 port = namescanResult.port;
334 wxServDisc* addrscan =
335 new wxServDisc(0, namescanResult.name, QTYPE_A);
336 for (
int k = 0; k < 10; k++) {
337 if (addrscan->getResultCount()) {
338 auto addrscanResult = addrscan->getResults().at(0);
339 ip = addrscanResult.ip;
345 wxMilliSleep(1000 * tSec / 10);
352 wxMilliSleep(1000 * tSec / 10);
359 wxMilliSleep(1000 * tSec / 10);
367void CommDriverSignalKNet::OpenWebSocket() {
369 wxLogMessage(wxString::Format(_T(
"Opening Signal K WebSocket client: %s"),
370 m_params.GetDSPort().c_str()));
375 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
376 wxLogError(wxT(
"Can't create WebSocketThread!"));
382 GetSocketThreadWatchdogTimer()->Start(1000,
389void CommDriverSignalKNet::CloseWebSocket() {
391 if (IsThreadRunning()) {
392 wxLogMessage(_T(
"Stopping Secondary SignalK Thread"));
393 m_stats_timer.Stop();
395 m_Thread_run_flag = 0;
397 while (IsThreadRunning() && tsec) {
403 if (m_Thread_run_flag <= 0)
404 msg.Printf(_T(
"Stopped in %d sec."), 10 - tsec);
406 msg.Printf(_T(
"Not Stopped after 10 sec."));
413 m_thread_run_flag = 0;
414 printf(
"sending delete\n");
415 m_wsThread->Delete();
419 while (IsThreadRunning() && (++nDeadman < 200)) {
422 printf(
"Closed in %d\n", nDeadman);
428void CommDriverSignalKNet::handle_SK_sentence(
430 rapidjson::Document root;
434 std::string* msg =
event.GetPayload().get();
435 std::string msgTerminated = *msg;
436 msgTerminated.append(
"\r\n");
439 if (root.HasParseError()) {
440 wxLogMessage(wxString::Format(
441 _T(
"SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
442 root.GetParseError()));
446 if (!root.IsObject()) {
447 wxLogMessage(wxString::Format(
448 _T(
"SignalKDataStream ERROR: Message is not a JSON Object: %s"),
455 if (root.HasMember(
"version")) {
456 wxString msg = _T(
"Connected to Signal K server version: ");
457 msg << (root[
"version"].GetString());
461 if (root.HasMember(
"self")) {
462 if (strncmp(root[
"self"].GetString(),
"vessels.", 8) == 0)
463 m_self = (root[
"self"].GetString());
466 m_self = std::string(
"vessels.")
467 .append(root[
"self"].GetString());
470 if (root.HasMember(
"context") && root[
"context"].IsString()) {
471 m_context = root[
"context"].GetString();
475 auto pos =
iface.find(
":");
476 std::string comm_interface =
"";
477 if (pos != std::string::npos) comm_interface =
iface.substr(pos + 1);
478 auto navmsg = std::make_shared<const SignalkMsg>(
479 m_self, m_context, msgTerminated, comm_interface);
480 m_listener.
Notify(std::move(navmsg));
483void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
485void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
488void CommDriverSignalKNet::handleUpdate(
wxJSONValue &update) {
489 wxString sfixtime =
"";
492 sfixtime = update[
"timestamp"].
AsString();
494 if (update.
HasMember(
"values") && update[
"values"].IsArray()) {
495 for (
int j = 0; j < update[
"values"].
Size(); ++j) {
497 updateItem(item, sfixtime);
502void CommDriverSignalKNet::updateItem(
wxJSONValue &item,
503 wxString &sfixtime) {
505 const wxString &update_path = item[
"path"].
AsString();
508 if (update_path == _T(
"navigation.position") && !value.IsNull()) {
509 updateNavigationPosition(value, sfixtime);
510 }
else if (update_path == _T(
"navigation.speedOverGround") &&
511 m_bGPSValid_SK && !value.IsNull()) {
512 updateNavigationSpeedOverGround(value, sfixtime);
513 }
else if (update_path == _T(
"navigation.courseOverGroundTrue") &&
514 m_bGPSValid_SK && !value.IsNull()) {
515 updateNavigationCourseOverGround(value, sfixtime);
516 }
else if (update_path == _T(
"navigation.courseOverGroundMagnetic")) {
518 else if (update_path ==
519 _T(
"navigation.gnss.satellites"))
521 updateGnssSatellites(value, sfixtime);
522 }
else if (update_path ==
523 _T(
"navigation.gnss.satellitesInView"))
525 updateGnssSatellites(value, sfixtime);
526 }
else if (update_path == _T(
"navigation.headingTrue")) {
528 updateHeadingTrue(value, sfixtime);
529 }
else if (update_path == _T(
"navigation.headingMagnetic")) {
531 updateHeadingMagnetic(value, sfixtime);
532 }
else if (update_path == _T(
"navigation.magneticVariation")) {
534 updateMagneticVariance(value, sfixtime);
541 writer.
Write(item, dbg);
542 wxString msg( _T(
"update: ") );
550void CommDriverSignalKNet::updateNavigationPosition(
552 if ((value.HasMember(
"latitude" && value[
"latitude"].IsDouble())) &&
553 (value.HasMember(
"longitude") && value[
"longitude"].IsDouble())) {
555 m_lat = value[
"latitude"].AsDouble();
556 m_lon = value[
"longitude"].AsDouble();
557 m_bGPSValid_SK =
true;
559 m_bGPSValid_SK =
false;
564void CommDriverSignalKNet::updateNavigationSpeedOverGround(
566 double sog_ms = value.AsDouble();
567 double sog_knot = sog_ms * ms_to_knot_factor;
572void CommDriverSignalKNet::updateNavigationCourseOverGround(
574 double cog_rad = value.AsDouble();
575 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
580void CommDriverSignalKNet::updateGnssSatellites(
wxJSONValue &value,
581 const wxString &sfixtime) {
584 if (value.AsInt() > 0) {
585 m_frame->setSatelitesInView(value.AsInt());
588 }
else if ((value.HasMember(
"count") && value[
"count"].IsInt())) {
589 m_frame->setSatelitesInView(value[
"count"].AsInt());
595void CommDriverSignalKNet::updateHeadingTrue(
wxJSONValue &value,
596 const wxString &sfixtime) {
597 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
600void CommDriverSignalKNet::updateHeadingMagnetic(
602 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
605void CommDriverSignalKNet::updateMagneticVariance(
607 m_var = GEODESIC_RAD2DEG(value.AsDouble());
const std::string iface
Physical device for 0183, else a unique string.
DriverStats GetDriverStats() const override
Get the Driver Statistics.
Interface for handling incoming messages.
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Custom event class for OpenCPN's notification system.
The JSON value class implementation.
int Size() const
Return the size of the array or map stored in this value.
bool HasMember(unsigned index) const
Return TRUE if the object contains an element at the specified index.
wxString AsString() const
Return the stored value as a wxWidget's string.
The JSON document writer.
void Write(const wxJSONValue &value, wxString &str)
Write the JSONvalue object to a JSON text.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.