OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_signalk_net.cpp
1/***************************************************************************
2 *
3 * Project: OpenCPN
4 * Purpose:
5 * Author: David Register, Alec Leamas
6 *
7 ***************************************************************************
8 * Copyright (C) 2022 by David Register, Alec Leamas *
9 * *
10 * This program is free software; you can redistribute it and/or modify *
11 * it under the terms of the GNU General Public License as published by *
12 * the Free Software Foundation; either version 2 of the License, or *
13 * (at your option) any later version. *
14 * *
15 * This program is distributed in the hope that it will be useful, *
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18 * GNU General Public License for more details. *
19 * *
20 * You should have received a copy of the GNU General Public License *
21 * along with this program; if not, write to the *
22 * Free Software Foundation, Inc., *
23 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
24 **************************************************************************/
25
26#include <vector>
27#include <mutex> // std::mutex
28#include <queue> // std::queue
29#include <chrono>
30#include <thread>
31
32#include "rapidjson/document.h"
33
34#include "model/comm_drv_signalk_net.h"
37#include "model/geodesic.h"
38#include "model/sys_events.h"
39#include "wxServDisc.h"
40
41#include "observable.h"
42
43#include "ixwebsocket/IXNetSystem.h"
44#include "ixwebsocket/IXWebSocket.h"
45#include "ixwebsocket/IXUserAgent.h"
46#include "ixwebsocket/IXSocketTLSOptions.h"
47using namespace std::literals::chrono_literals;
48
49const int kTimerSocket = 9006;
50
51class CommDriverSignalKNetEvent; // fwd
52
53class CommDriverSignalKNetThread : public wxThread {
54public:
56 const wxString& PortName,
57 const wxString& strBaudRate);
58
60 void* Entry();
61 bool SetOutMsg(const wxString& msg);
62 void OnExit(void);
63
64private:
65 void ThreadMessage(const wxString& msg);
66 bool OpenComPortPhysical(const wxString& com_name, int baud_rate);
67 void CloseComPortPhysical();
68 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
69 size_t WriteComPortPhysical(unsigned char* msg, size_t length);
70 void SetGatewayOperationMode(void);
71
72 CommDriverSignalKNet* m_pParentDriver;
73 wxString m_PortName;
74 wxString m_FullPortName;
75
76 unsigned char* put_ptr;
77 unsigned char* tak_ptr;
78
79 unsigned char* rx_buffer;
80
81 int m_baud;
82 int m_n_timeout;
83
84 // n2k_atomic_queue<char*> out_que;
85};
86
88wxDECLARE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
89
90class CommDriverSignalKNetEvent : public wxEvent {
91public:
92 CommDriverSignalKNetEvent(wxEventType commandType = wxEVT_NULL, int id = 0)
93 : wxEvent(id, commandType) {};
95
96 // accessors
97 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
98 std::shared_ptr<std::string> GetPayload() { return m_payload; }
99
100 // required for sending with wxPostEvent()
101 wxEvent* Clone() const {
103 newevent->m_payload = this->m_payload;
104 return newevent;
105 };
106
107private:
108 std::shared_ptr<std::string> m_payload;
109};
110
111// WebSocket implementation
112
113class WebSocketThread : public wxThread {
114public:
115 WebSocketThread(CommDriverSignalKNet* parent, wxIPV4address address,
116 wxEvtHandler* consumer, const std::string& token);
117 virtual void* Entry();
118
119 DriverStats GetStats() const;
120
121private:
122 void HandleMessage(const std::string& message);
123 wxEvtHandler* s_wsSKConsumer;
124 wxIPV4address m_address;
125 wxEvtHandler* m_consumer;
126 CommDriverSignalKNet* m_parentStream;
127 std::string m_token;
128 ix::WebSocket ws;
129 ObsListener resume_listener;
130 DriverStats m_driver_stats;
131 mutable std::mutex m_stats_mutex;
132};
133
134WebSocketThread::WebSocketThread(CommDriverSignalKNet* parent,
135 wxIPV4address address, wxEvtHandler* consumer,
136 const std::string& token)
137 : m_address(address),
138 m_consumer(consumer),
139 m_parentStream(parent),
140 m_token(token) {
141 resume_listener.Init(SystemEvents::GetInstance().evt_resume,
142 [&](ObservedEvt& ev) {
143 ws.stop();
144 ws.start();
145 wxLogDebug("WebSocketThread: restarted");
146 });
147}
148
149void* WebSocketThread::Entry() {
150 using namespace std::chrono_literals;
151 bool not_done = true;
152
153 m_parentStream->SetThreadRunning(true);
154
155 s_wsSKConsumer = m_consumer;
156
157 wxString host = m_address.IPAddress();
158 int port = m_address.Service();
159
160 // Craft the address string
161 std::stringstream wsAddress;
162 wsAddress << "ws://" << host << ":" << port
163 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
164 std::stringstream wssAddress;
165 wssAddress << "wss://" << host << ":" << port
166 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
167
168 if (!m_token.empty()) {
169 wsAddress << "&token=" << m_token;
170 wssAddress << "&token=" << m_token;
171 }
172
173 ws.setUrl(wssAddress.str());
174 ix::SocketTLSOptions opt;
175 opt.disable_hostname_validation = true;
176 opt.caFile = "NONE";
177 ws.setTLSOptions(opt);
178 ws.setPingInterval(30);
179
180 auto message_callback = [&](const ix::WebSocketMessagePtr& msg) {
181 if (msg->type == ix::WebSocketMessageType::Message) {
182 HandleMessage(msg->str);
183 } else if (msg->type == ix::WebSocketMessageType::Open) {
184 wxLogDebug("websocket: Connection established");
185 std::lock_guard lock(m_stats_mutex);
186 m_driver_stats.available = true;
187 } else if (msg->type == ix::WebSocketMessageType::Close) {
188 wxLogDebug("websocket: Connection disconnected");
189 std::lock_guard lock(m_stats_mutex);
190 m_driver_stats.available = false;
191 } else if (msg->type == ix::WebSocketMessageType::Error) {
192 std::lock_guard lock(m_stats_mutex);
193 m_driver_stats.error_count++;
194 wxLogDebug("websocket: error: %s", msg->errorInfo.reason.c_str());
195 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
196 : ws.setUrl(wsAddress.str());
197 }
198 };
199
200 ws.setOnMessageCallback(message_callback);
201
202 {
203 std::lock_guard lock(m_stats_mutex);
204 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
205 m_driver_stats.driver_iface = m_parentStream->m_params.GetStrippedDSPort();
206 m_driver_stats.available = false;
207 }
208
209 ws.start();
210
211 while (m_parentStream->m_Thread_run_flag > 0) {
212 std::this_thread::sleep_for(100ms);
213 }
214
215 ws.stop();
216 m_parentStream->SetThreadRunning(false);
217 m_parentStream->m_Thread_run_flag = -1;
218 {
219 std::lock_guard lock(m_stats_mutex);
220 m_driver_stats.available = false;
221 }
222
223 return 0;
224}
225
226DriverStats WebSocketThread::GetStats() const {
227 std::lock_guard lock(m_stats_mutex);
228 return m_driver_stats;
229}
230
231void WebSocketThread::HandleMessage(const std::string& message) {
232 if (s_wsSKConsumer) {
233 CommDriverSignalKNetEvent signalKEvent(wxEVT_COMMDRIVER_SIGNALK_NET, 0);
234 auto buffer = std::make_shared<std::string>(message);
235
236 signalKEvent.SetPayload(buffer);
237 s_wsSKConsumer->AddPendingEvent(signalKEvent);
238 m_driver_stats.rx_count++;
239 }
240}
241
242//========================================================================
243/* CommDriverSignalKNet implementation
244 * */
245
246wxDEFINE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
247
248CommDriverSignalKNet::CommDriverSignalKNet(const ConnectionParams* params,
249 DriverListener& listener)
250 : CommDriverSignalK(params->GetStrippedDSPort()),
251 m_Thread_run_flag(-1),
252 m_params(*params),
253 m_listener(listener),
254 m_stats_timer(*this, 2s) {
255 // Prepare the wxEventHandler to accept events from the actual hardware thread
256 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
257 this);
258
259 m_addr.Hostname(params->NetworkAddress);
260 m_addr.Service(params->NetworkPort);
261 m_token = params->AuthToken;
262 m_socketread_watchdog_timer.SetOwner(this, kTimerSocket);
263 m_wsThread = NULL;
264 m_threadActive = false;
265
266 // Dummy Driver Stats, may be polled before worker thread is active
267 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
268 m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
269 m_driver_stats.available = false;
270
271 Open();
272}
273
274CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
275
277 if (m_wsThread)
278 return m_wsThread->GetStats();
279 else
280 return m_driver_stats;
281}
282
283void CommDriverSignalKNet::Open(void) {
284 wxString discoveredIP;
285#if 0
286 int discoveredPort;
287#endif
288
289 // if (m_useWebSocket)
290 {
291 std::string serviceIdent =
292 std::string("_signalk-ws._tcp.local."); // Works for node.js server
293#if 0
294 if (m_params->AutoSKDiscover) {
295 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
296 1)) { // 1 second scan
297 wxLogDebug(wxString::Format(
298 _T("SK server autodiscovery finds WebSocket service: %s:%d"),
299 discoveredIP.c_str(), discoveredPort));
300 m_addr.Hostname(discoveredIP);
301 m_addr.Service(discoveredPort);
302
303 // Update the connection params, by pointer to item in global params
304 // array
305 ConnectionParams *params = (ConnectionParams *)m_params; // non-const
306 params->NetworkAddress = discoveredIP;
307 params->NetworkPort = discoveredPort;
308 } else
309 wxLogDebug(_T("SK server autodiscovery finds no WebSocket server."));
310 }
311#endif
312 OpenWebSocket();
313 }
314}
315void CommDriverSignalKNet::Close() { CloseWebSocket(); }
316
317bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
318 wxString& ip, int& port, int tSec) {
319 wxServDisc* servscan =
320 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
321
322 for (int i = 0; i < 10; i++) {
323 if (servscan->getResultCount()) {
324 auto result = servscan->getResults().at(0);
325 delete servscan;
326
327 wxServDisc* namescan = new wxServDisc(0, result.name, QTYPE_SRV);
328 for (int j = 0; j < 10; j++) {
329 if (namescan->getResultCount()) {
330 auto namescanResult = namescan->getResults().at(0);
331 port = namescanResult.port;
332 delete namescan;
333
334 wxServDisc* addrscan =
335 new wxServDisc(0, namescanResult.name, QTYPE_A);
336 for (int k = 0; k < 10; k++) {
337 if (addrscan->getResultCount()) {
338 auto addrscanResult = addrscan->getResults().at(0);
339 ip = addrscanResult.ip;
340 delete addrscan;
341 return true;
342 break;
343 } else {
344 wxYield();
345 wxMilliSleep(1000 * tSec / 10);
346 }
347 }
348 delete addrscan;
349 return false;
350 } else {
351 wxYield();
352 wxMilliSleep(1000 * tSec / 10);
353 }
354 }
355 delete namescan;
356 return false;
357 } else {
358 wxYield();
359 wxMilliSleep(1000 * tSec / 10);
360 }
361 }
362
363 delete servscan;
364 return false;
365}
366
367void CommDriverSignalKNet::OpenWebSocket() {
368 // printf("OpenWebSocket\n");
369 wxLogMessage(wxString::Format(_T("Opening Signal K WebSocket client: %s"),
370 m_params.GetDSPort().c_str()));
371
372 // Start a thread to run the client without blocking
373
374 m_wsThread = new WebSocketThread(this, GetAddr(), this, m_token);
375 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
376 wxLogError(wxT("Can't create WebSocketThread!"));
377
378 return;
379 }
380
381 ResetWatchdog();
382 GetSocketThreadWatchdogTimer()->Start(1000,
383 wxTIMER_ONE_SHOT); // Start the dog
384 SetThreadRunFlag(1);
385
386 m_wsThread->Run();
387}
388
389void CommDriverSignalKNet::CloseWebSocket() {
390 if (m_wsThread) {
391 if (IsThreadRunning()) {
392 wxLogMessage(_T("Stopping Secondary SignalK Thread"));
393 m_stats_timer.Stop();
394
395 m_Thread_run_flag = 0;
396 int tsec = 10;
397 while (IsThreadRunning() && tsec) {
398 wxSleep(1);
399 tsec--;
400 }
401
402 wxString msg;
403 if (m_Thread_run_flag <= 0)
404 msg.Printf(_T("Stopped in %d sec."), 10 - tsec);
405 else
406 msg.Printf(_T("Not Stopped after 10 sec."));
407 wxLogMessage(msg);
408 }
409
410 wxMilliSleep(100);
411
412#if 0
413 m_thread_run_flag = 0;
414 printf("sending delete\n");
415 m_wsThread->Delete();
416 wxMilliSleep(100);
417
418 int nDeadman = 0;
419 while (IsThreadRunning() && (++nDeadman < 200)) { // spin for max 2 secs.
420 wxMilliSleep(10);
421 }
422 printf("Closed in %d\n", nDeadman);
423 wxMilliSleep(100);
424#endif
425 }
426}
427
428void CommDriverSignalKNet::handle_SK_sentence(
430 rapidjson::Document root;
431
432 // LOG_DEBUG("%s\n", msg.c_str());
433
434 std::string* msg = event.GetPayload().get();
435 std::string msgTerminated = *msg;
436 msgTerminated.append("\r\n");
437
438 root.Parse(*msg);
439 if (root.HasParseError()) {
440 wxLogMessage(wxString::Format(
441 _T("SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
442 root.GetParseError()));
443 return;
444 }
445
446 if (!root.IsObject()) {
447 wxLogMessage(wxString::Format(
448 _T("SignalKDataStream ERROR: Message is not a JSON Object: %s"),
449 msg->c_str()));
450 return;
451 }
452
453 // Decode just enough of string to extract some identifiers
454 // such as the sK version, "self" context, and target context
455 if (root.HasMember("version")) {
456 wxString msg = _T("Connected to Signal K server version: ");
457 msg << (root["version"].GetString());
458 wxLogMessage(msg);
459 }
460
461 if (root.HasMember("self")) {
462 if (strncmp(root["self"].GetString(), "vessels.", 8) == 0)
463 m_self = (root["self"].GetString()); // for java server, and OpenPlotter
464 // node.js server 1.20
465 else
466 m_self = std::string("vessels.")
467 .append(root["self"].GetString()); // for Node.js server
468 }
469
470 if (root.HasMember("context") && root["context"].IsString()) {
471 m_context = root["context"].GetString();
472 }
473
474 // Notify all listeners
475 auto pos = iface.find(":");
476 std::string comm_interface = "";
477 if (pos != std::string::npos) comm_interface = iface.substr(pos + 1);
478 auto navmsg = std::make_shared<const SignalkMsg>(
479 m_self, m_context, msgTerminated, comm_interface);
480 m_listener.Notify(std::move(navmsg));
481}
482
483void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
484
485void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
486
487#if 0
488void CommDriverSignalKNet::handleUpdate(wxJSONValue &update) {
489 wxString sfixtime = "";
490
491 if (update.HasMember("timestamp")) {
492 sfixtime = update["timestamp"].AsString();
493 }
494 if (update.HasMember("values") && update["values"].IsArray()) {
495 for (int j = 0; j < update["values"].Size(); ++j) {
496 wxJSONValue &item = update["values"][j];
497 updateItem(item, sfixtime);
498 }
499 }
500}
501
502void CommDriverSignalKNet::updateItem(wxJSONValue &item,
503 wxString &sfixtime) {
504 if (item.HasMember("path") && item.HasMember("value")) {
505 const wxString &update_path = item["path"].AsString();
506 wxJSONValue &value = item["value"];
507
508 if (update_path == _T("navigation.position") && !value.IsNull()) {
509 updateNavigationPosition(value, sfixtime);
510 } else if (update_path == _T("navigation.speedOverGround") &&
511 m_bGPSValid_SK && !value.IsNull()) {
512 updateNavigationSpeedOverGround(value, sfixtime);
513 } else if (update_path == _T("navigation.courseOverGroundTrue") &&
514 m_bGPSValid_SK && !value.IsNull()) {
515 updateNavigationCourseOverGround(value, sfixtime);
516 } else if (update_path == _T("navigation.courseOverGroundMagnetic")) {
517 }
518 else if (update_path ==
519 _T("navigation.gnss.satellites")) // From GGA sats in use
520 {
521 /*if (g_priSats >= 2)*/ updateGnssSatellites(value, sfixtime);
522 } else if (update_path ==
523 _T("navigation.gnss.satellitesInView")) // From GSV sats in view
524 {
525 /*if (g_priSats >= 3)*/ updateGnssSatellites(value, sfixtime);
526 } else if (update_path == _T("navigation.headingTrue")) {
527 if(!value.IsNull())
528 updateHeadingTrue(value, sfixtime);
529 } else if (update_path == _T("navigation.headingMagnetic")) {
530 if(!value.IsNull())
531 updateHeadingMagnetic(value, sfixtime);
532 } else if (update_path == _T("navigation.magneticVariation")) {
533 if(!value.IsNull())
534 updateMagneticVariance(value, sfixtime);
535 } else {
536 // wxLogMessage(wxString::Format(_T("** Signal K unhandled update: %s"),
537 // update_path));
538#if 0
539 wxString dbg;
540 wxJSONWriter writer;
541 writer.Write(item, dbg);
542 wxString msg( _T("update: ") );
543 msg.append(dbg);
544 wxLogMessage(msg);
545#endif
546 }
547 }
548}
549
550void CommDriverSignalKNet::updateNavigationPosition(
551 wxJSONValue &value, const wxString &sfixtime) {
552 if ((value.HasMember("latitude" && value["latitude"].IsDouble())) &&
553 (value.HasMember("longitude") && value["longitude"].IsDouble())) {
554 // wxLogMessage(_T(" ***** Position Update"));
555 m_lat = value["latitude"].AsDouble();
556 m_lon = value["longitude"].AsDouble();
557 m_bGPSValid_SK = true;
558 } else {
559 m_bGPSValid_SK = false;
560 }
561}
562
563
564void CommDriverSignalKNet::updateNavigationSpeedOverGround(
565 wxJSONValue &value, const wxString &sfixtime){
566 double sog_ms = value.AsDouble();
567 double sog_knot = sog_ms * ms_to_knot_factor;
568 // wxLogMessage(wxString::Format(_T(" ***** SOG: %f, %f"), sog_ms, sog_knot));
569 m_sog = sog_knot;
570}
571
572void CommDriverSignalKNet::updateNavigationCourseOverGround(
573 wxJSONValue &value, const wxString &sfixtime) {
574 double cog_rad = value.AsDouble();
575 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
576 // wxLogMessage(wxString::Format(_T(" ***** COG: %f, %f"), cog_rad, cog_deg));
577 m_cog = cog_deg;
578}
579
580void CommDriverSignalKNet::updateGnssSatellites(wxJSONValue &value,
581 const wxString &sfixtime) {
582#if 0
583 if (value.IsInt()) {
584 if (value.AsInt() > 0) {
585 m_frame->setSatelitesInView(value.AsInt());
586 g_priSats = 2;
587 }
588 } else if ((value.HasMember("count") && value["count"].IsInt())) {
589 m_frame->setSatelitesInView(value["count"].AsInt());
590 g_priSats = 3;
591 }
592#endif
593}
594
595void CommDriverSignalKNet::updateHeadingTrue(wxJSONValue &value,
596 const wxString &sfixtime) {
597 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
598}
599
600void CommDriverSignalKNet::updateHeadingMagnetic(
601 wxJSONValue &value, const wxString &sfixtime) {
602 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
603}
604
605void CommDriverSignalKNet::updateMagneticVariance(
606 wxJSONValue &value, const wxString &sfixtime) {
607 m_var = GEODESIC_RAD2DEG(value.AsDouble());
608}
609
610#endif
612
613// std::vector<unsigned char>* payload = p.get();
614//
615// // Extract the NMEA0183 sentence
616// std::string full_sentence = std::string(payload->begin(), payload->end());
const std::string iface
Physical device for 0183, else a unique string.
Definition comm_driver.h:97
DriverStats GetDriverStats() const override
Get the Driver Statistics.
Interface for handling incoming messages.
Definition comm_driver.h:52
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
Definition observable.h:247
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Definition observable.h:274
Custom event class for OpenCPN's notification system.
The JSON value class implementation.
Definition jsonval.h:84
int Size() const
Return the size of the array or map stored in this value.
Definition jsonval.cpp:1332
bool HasMember(unsigned index) const
Return TRUE if the object contains an element at the specified index.
Definition jsonval.cpp:1298
wxString AsString() const
Return the stored value as a wxWidget's string.
Definition jsonval.cpp:872
The JSON document writer.
Definition jsonwriter.h:50
void Write(const wxJSONValue &value, wxString &str)
Write the JSONvalue object to a JSON text.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.