OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_signalk_net.cpp
1/***************************************************************************
2 *
3 * Project: OpenCPN
4 * Purpose:
5 * Author: David Register, Alec Leamas
6 *
7 ***************************************************************************
8 * Copyright (C) 2022 by David Register, Alec Leamas *
9 * *
10 * This program is free software; you can redistribute it and/or modify *
11 * it under the terms of the GNU General Public License as published by *
12 * the Free Software Foundation; either version 2 of the License, or *
13 * (at your option) any later version. *
14 * *
15 * This program is distributed in the hope that it will be useful, *
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18 * GNU General Public License for more details. *
19 * *
20 * You should have received a copy of the GNU General Public License *
21 * along with this program; if not, write to the *
22 * Free Software Foundation, Inc., *
23 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
24 **************************************************************************/
25
26#include <vector>
27#include <mutex> // std::mutex
28#include <queue> // std::queue
29#include <chrono>
30#include <thread>
31
32#include "rapidjson/document.h"
33
34#include "model/comm_drv_signalk_net.h"
37#include "model/geodesic.h"
38#include "model/sys_events.h"
39#include "wxServDisc.h"
40
41#include "observable.h"
42
43#include "ixwebsocket/IXNetSystem.h"
44#include "ixwebsocket/IXWebSocket.h"
45#include "ixwebsocket/IXUserAgent.h"
46#include "ixwebsocket/IXSocketTLSOptions.h"
47using namespace std::literals::chrono_literals;
48
49const int kTimerSocket = 9006;
50
51class CommDriverSignalKNetEvent; // fwd
52
53class CommDriverSignalKNetThread : public wxThread {
54public:
56 const wxString& PortName,
57 const wxString& strBaudRate);
58
60 void* Entry();
61 bool SetOutMsg(const wxString& msg);
62 void OnExit(void);
63
64private:
65 void ThreadMessage(const wxString& msg);
66 bool OpenComPortPhysical(const wxString& com_name, int baud_rate);
67 void CloseComPortPhysical();
68 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
69 size_t WriteComPortPhysical(unsigned char* msg, size_t length);
70 void SetGatewayOperationMode(void);
71
72 CommDriverSignalKNet* m_pParentDriver;
73 wxString m_PortName;
74 wxString m_FullPortName;
75
76 unsigned char* put_ptr;
77 unsigned char* tak_ptr;
78
79 unsigned char* rx_buffer;
80
81 int m_baud;
82 int m_n_timeout;
83
84 // n2k_atomic_queue<char*> out_que;
85};
86
88wxDECLARE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
89
90class CommDriverSignalKNetEvent : public wxEvent {
91public:
92 CommDriverSignalKNetEvent(wxEventType commandType = wxEVT_NULL, int id = 0)
93 : wxEvent(id, commandType) {};
95
96 // accessors
97 void SetPayload(std::shared_ptr<std::string> data) { m_payload = data; }
98 std::shared_ptr<std::string> GetPayload() { return m_payload; }
99
100 // required for sending with wxPostEvent()
101 wxEvent* Clone() const {
103 newevent->m_payload = this->m_payload;
104 return newevent;
105 };
106
107private:
108 std::shared_ptr<std::string> m_payload;
109};
110
111// WebSocket implementation
112
113class WebSocketThread : public wxThread {
114public:
115 WebSocketThread(CommDriverSignalKNet* parent, wxIPV4address address,
116 wxEvtHandler* consumer, const std::string& token);
117 virtual void* Entry();
118
119 DriverStats GetStats() const;
120
121private:
122 void HandleMessage(const std::string& message);
123 wxEvtHandler* s_wsSKConsumer;
124 wxIPV4address m_address;
125 wxEvtHandler* m_consumer;
126 CommDriverSignalKNet* m_parentStream;
127 std::string m_token;
128 ix::WebSocket ws;
129 ObsListener resume_listener;
130 DriverStats m_driver_stats;
131 mutable std::mutex m_stats_mutex;
132};
133
134WebSocketThread::WebSocketThread(CommDriverSignalKNet* parent,
135 wxIPV4address address, wxEvtHandler* consumer,
136 const std::string& token)
137 : m_address(address),
138 m_consumer(consumer),
139 m_parentStream(parent),
140 m_token(token) {
141 resume_listener.Init(SystemEvents::GetInstance().evt_resume,
142 [&](ObservedEvt& ev) {
143 ws.stop();
144 ws.start();
145 wxLogDebug("WebSocketThread: restarted");
146 });
147}
148
149void* WebSocketThread::Entry() {
150 using namespace std::chrono_literals;
151 bool not_done = true;
152
153 m_parentStream->SetThreadRunning(true);
154
155 s_wsSKConsumer = m_consumer;
156
157 wxString host = m_address.IPAddress();
158 int port = m_address.Service();
159
160 // Craft the address string
161 std::stringstream wsAddress;
162 wsAddress << "ws://" << host << ":" << port
163 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
164 std::stringstream wssAddress;
165 wssAddress << "wss://" << host << ":" << port
166 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
167
168 if (!m_token.empty()) {
169 wsAddress << "&token=" << m_token;
170 wssAddress << "&token=" << m_token;
171 }
172
173 ws.setUrl(wssAddress.str());
174 ix::SocketTLSOptions opt;
175 opt.disable_hostname_validation = true;
176 opt.caFile = "NONE";
177 ws.setTLSOptions(opt);
178 ws.setPingInterval(30);
179
180 auto message_callback = [&](const ix::WebSocketMessagePtr& msg) {
181 if (msg->type == ix::WebSocketMessageType::Message) {
182 HandleMessage(msg->str);
183 } else if (msg->type == ix::WebSocketMessageType::Open) {
184 wxLogDebug("websocket: Connection established");
185 std::lock_guard lock(m_stats_mutex);
186 m_driver_stats.available = true;
187 } else if (msg->type == ix::WebSocketMessageType::Close) {
188 wxLogDebug("websocket: Connection disconnected");
189 std::lock_guard lock(m_stats_mutex);
190 m_driver_stats.available = false;
191 } else if (msg->type == ix::WebSocketMessageType::Error) {
192 std::lock_guard lock(m_stats_mutex);
193 m_driver_stats.error_count++;
194 wxLogDebug("websocket: error: %s", msg->errorInfo.reason.c_str());
195 ws.getUrl() == wsAddress.str() ? ws.setUrl(wssAddress.str())
196 : ws.setUrl(wsAddress.str());
197 }
198 };
199
200 ws.setOnMessageCallback(message_callback);
201
202 {
203 std::lock_guard lock(m_stats_mutex);
204 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
205 m_driver_stats.driver_iface = m_parentStream->m_params.GetStrippedDSPort();
206 m_driver_stats.available = false;
207 }
208
209 ws.start();
210
211 while (m_parentStream->m_Thread_run_flag > 0) {
212 std::this_thread::sleep_for(100ms);
213 }
214
215 ws.stop();
216 m_parentStream->SetThreadRunning(false);
217 m_parentStream->m_Thread_run_flag = -1;
218 {
219 std::lock_guard lock(m_stats_mutex);
220 m_driver_stats.available = false;
221 }
222
223 return 0;
224}
225
226DriverStats WebSocketThread::GetStats() const {
227 std::lock_guard lock(m_stats_mutex);
228 return m_driver_stats;
229}
230
231void WebSocketThread::HandleMessage(const std::string& message) {
232 if (s_wsSKConsumer) {
233 CommDriverSignalKNetEvent signalKEvent(wxEVT_COMMDRIVER_SIGNALK_NET, 0);
234 auto buffer = std::make_shared<std::string>(message);
235
236 signalKEvent.SetPayload(buffer);
237 s_wsSKConsumer->AddPendingEvent(signalKEvent);
238 m_driver_stats.rx_count++;
239 }
240}
241
242//========================================================================
243/* CommDriverSignalKNet implementation
244 * */
245
246wxDEFINE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
247
248CommDriverSignalKNet::CommDriverSignalKNet(const ConnectionParams* params,
249 DriverListener& listener)
250 : CommDriverSignalK(params->GetStrippedDSPort()),
251 m_Thread_run_flag(-1),
252 m_params(*params),
253 m_listener(listener),
254 m_stats_timer(*this, 2s) {
255 // Prepare the wxEventHandler to accept events from the actual hardware thread
256 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
257 this);
258
259 m_addr.Hostname(params->NetworkAddress);
260 m_addr.Service(params->NetworkPort);
261 m_token = params->AuthToken;
262 m_socketread_watchdog_timer.SetOwner(this, kTimerSocket);
263 m_wsThread = NULL;
264 m_threadActive = false;
265
266 // Dummy Driver Stats, may be polled before worker thread is active
267 m_driver_stats.driver_bus = NavAddr::Bus::Signalk;
268 m_driver_stats.driver_iface = m_params.GetStrippedDSPort();
269 m_driver_stats.available = false;
270
271 Open();
272}
273
274CommDriverSignalKNet::~CommDriverSignalKNet() { Close(); }
275
276DriverStats CommDriverSignalKNet::GetDriverStats() const {
277 if (m_wsThread)
278 return m_wsThread->GetStats();
279 else
280 return m_driver_stats;
281}
282
283void CommDriverSignalKNet::Open(void) {
284 wxString discoveredIP;
285#if 0
286 int discoveredPort;
287#endif
288
289 // if (m_useWebSocket)
290 {
291 std::string serviceIdent =
292 std::string("_signalk-ws._tcp.local."); // Works for node.js server
293#if 0
294 if (m_params->AutoSKDiscover) {
295 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
296 1)) { // 1 second scan
297 wxLogDebug(wxString::Format(
298 _T("SK server autodiscovery finds WebSocket service: %s:%d"),
299 discoveredIP.c_str(), discoveredPort));
300 m_addr.Hostname(discoveredIP);
301 m_addr.Service(discoveredPort);
302
303 // Update the connection params, by pointer to item in global params
304 // array
305 ConnectionParams *params = (ConnectionParams *)m_params; // non-const
306 params->NetworkAddress = discoveredIP;
307 params->NetworkPort = discoveredPort;
308 } else
309 wxLogDebug(_T("SK server autodiscovery finds no WebSocket server."));
310 }
311#endif
312 OpenWebSocket();
313 }
314}
315void CommDriverSignalKNet::Close() { CloseWebSocket(); }
316
317bool CommDriverSignalKNet::DiscoverSKServer(std::string serviceIdent,
318 wxString& ip, int& port, int tSec) {
319 wxServDisc* servscan =
320 new wxServDisc(0, wxString(serviceIdent.c_str()), QTYPE_PTR);
321
322 for (int i = 0; i < 10; i++) {
323 if (servscan->getResultCount()) {
324 auto result = servscan->getResults().at(0);
325 delete servscan;
326
327 wxServDisc* namescan = new wxServDisc(0, result.name, QTYPE_SRV);
328 for (int j = 0; j < 10; j++) {
329 if (namescan->getResultCount()) {
330 auto namescanResult = namescan->getResults().at(0);
331 port = namescanResult.port;
332 delete namescan;
333
334 wxServDisc* addrscan =
335 new wxServDisc(0, namescanResult.name, QTYPE_A);
336 for (int k = 0; k < 10; k++) {
337 if (addrscan->getResultCount()) {
338 auto addrscanResult = addrscan->getResults().at(0);
339 ip = addrscanResult.ip;
340 delete addrscan;
341 return true;
342 break;
343 } else {
344 wxYield();
345 wxMilliSleep(1000 * tSec / 10);
346 }
347 }
348 delete addrscan;
349 return false;
350 } else {
351 wxYield();
352 wxMilliSleep(1000 * tSec / 10);
353 }
354 }
355 delete namescan;
356 return false;
357 } else {
358 wxYield();
359 wxMilliSleep(1000 * tSec / 10);
360 }
361 }
362
363 delete servscan;
364 return false;
365}
366
367void CommDriverSignalKNet::OpenWebSocket() {
368 // printf("OpenWebSocket\n");
369 wxLogMessage(wxString::Format(_T("Opening Signal K WebSocket client: %s"),
370 m_params.GetDSPort().c_str()));
371
372 // Start a thread to run the client without blocking
373
374 m_wsThread = new WebSocketThread(this, GetAddr(), this, m_token);
375 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
376 wxLogError(wxT("Can't create WebSocketThread!"));
377
378 return;
379 }
380
381 ResetWatchdog();
382 GetSocketThreadWatchdogTimer()->Start(1000,
383 wxTIMER_ONE_SHOT); // Start the dog
384 SetThreadRunFlag(1);
385
386 m_wsThread->Run();
387}
388
389void CommDriverSignalKNet::CloseWebSocket() {
390 if (m_wsThread) {
391 if (IsThreadRunning()) {
392 wxLogMessage(_T("Stopping Secondary SignalK Thread"));
393
394 m_Thread_run_flag = 0;
395 int tsec = 10;
396 while (IsThreadRunning() && tsec) {
397 wxSleep(1);
398 tsec--;
399 }
400
401 wxString msg;
402 if (m_Thread_run_flag <= 0)
403 msg.Printf(_T("Stopped in %d sec."), 10 - tsec);
404 else
405 msg.Printf(_T("Not Stopped after 10 sec."));
406 wxLogMessage(msg);
407 }
408
409 wxMilliSleep(100);
410
411#if 0
412 m_thread_run_flag = 0;
413 printf("sending delete\n");
414 m_wsThread->Delete();
415 wxMilliSleep(100);
416
417 int nDeadman = 0;
418 while (IsThreadRunning() && (++nDeadman < 200)) { // spin for max 2 secs.
419 wxMilliSleep(10);
420 }
421 printf("Closed in %d\n", nDeadman);
422 wxMilliSleep(100);
423#endif
424 }
425}
426
427void CommDriverSignalKNet::handle_SK_sentence(
429 rapidjson::Document root;
430
431 // LOG_DEBUG("%s\n", msg.c_str());
432
433 std::string* msg = event.GetPayload().get();
434 std::string msgTerminated = *msg;
435 msgTerminated.append("\r\n");
436
437 root.Parse(*msg);
438 if (root.HasParseError()) {
439 wxLogMessage(wxString::Format(
440 _T("SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
441 root.GetParseError()));
442 return;
443 }
444
445 if (!root.IsObject()) {
446 wxLogMessage(wxString::Format(
447 _T("SignalKDataStream ERROR: Message is not a JSON Object: %s"),
448 msg->c_str()));
449 return;
450 }
451
452 // Decode just enough of string to extract some identifiers
453 // such as the sK version, "self" context, and target context
454 if (root.HasMember("version")) {
455 wxString msg = _T("Connected to Signal K server version: ");
456 msg << (root["version"].GetString());
457 wxLogMessage(msg);
458 }
459
460 if (root.HasMember("self")) {
461 if (strncmp(root["self"].GetString(), "vessels.", 8) == 0)
462 m_self = (root["self"].GetString()); // for java server, and OpenPlotter
463 // node.js server 1.20
464 else
465 m_self = std::string("vessels.")
466 .append(root["self"].GetString()); // for Node.js server
467 }
468
469 if (root.HasMember("context") && root["context"].IsString()) {
470 m_context = root["context"].GetString();
471 }
472
473 // Notify all listeners
474 auto pos = iface.find(":");
475 std::string comm_interface = "";
476 if (pos != std::string::npos) comm_interface = iface.substr(pos + 1);
477 auto navmsg = std::make_shared<const SignalkMsg>(
478 m_self, m_context, msgTerminated, comm_interface);
479 m_listener.Notify(std::move(navmsg));
480}
481
482void CommDriverSignalKNet::initIXNetSystem() { ix::initNetSystem(); };
483
484void CommDriverSignalKNet::uninitIXNetSystem() { ix::uninitNetSystem(); };
485
486#if 0
487void CommDriverSignalKNet::handleUpdate(wxJSONValue &update) {
488 wxString sfixtime = "";
489
490 if (update.HasMember("timestamp")) {
491 sfixtime = update["timestamp"].AsString();
492 }
493 if (update.HasMember("values") && update["values"].IsArray()) {
494 for (int j = 0; j < update["values"].Size(); ++j) {
495 wxJSONValue &item = update["values"][j];
496 updateItem(item, sfixtime);
497 }
498 }
499}
500
501void CommDriverSignalKNet::updateItem(wxJSONValue &item,
502 wxString &sfixtime) {
503 if (item.HasMember("path") && item.HasMember("value")) {
504 const wxString &update_path = item["path"].AsString();
505 wxJSONValue &value = item["value"];
506
507 if (update_path == _T("navigation.position") && !value.IsNull()) {
508 updateNavigationPosition(value, sfixtime);
509 } else if (update_path == _T("navigation.speedOverGround") &&
510 m_bGPSValid_SK && !value.IsNull()) {
511 updateNavigationSpeedOverGround(value, sfixtime);
512 } else if (update_path == _T("navigation.courseOverGroundTrue") &&
513 m_bGPSValid_SK && !value.IsNull()) {
514 updateNavigationCourseOverGround(value, sfixtime);
515 } else if (update_path == _T("navigation.courseOverGroundMagnetic")) {
516 }
517 else if (update_path ==
518 _T("navigation.gnss.satellites")) // From GGA sats in use
519 {
520 /*if (g_priSats >= 2)*/ updateGnssSatellites(value, sfixtime);
521 } else if (update_path ==
522 _T("navigation.gnss.satellitesInView")) // From GSV sats in view
523 {
524 /*if (g_priSats >= 3)*/ updateGnssSatellites(value, sfixtime);
525 } else if (update_path == _T("navigation.headingTrue")) {
526 if(!value.IsNull())
527 updateHeadingTrue(value, sfixtime);
528 } else if (update_path == _T("navigation.headingMagnetic")) {
529 if(!value.IsNull())
530 updateHeadingMagnetic(value, sfixtime);
531 } else if (update_path == _T("navigation.magneticVariation")) {
532 if(!value.IsNull())
533 updateMagneticVariance(value, sfixtime);
534 } else {
535 // wxLogMessage(wxString::Format(_T("** Signal K unhandled update: %s"),
536 // update_path));
537#if 0
538 wxString dbg;
539 wxJSONWriter writer;
540 writer.Write(item, dbg);
541 wxString msg( _T("update: ") );
542 msg.append(dbg);
543 wxLogMessage(msg);
544#endif
545 }
546 }
547}
548
549void CommDriverSignalKNet::updateNavigationPosition(
550 wxJSONValue &value, const wxString &sfixtime) {
551 if ((value.HasMember("latitude" && value["latitude"].IsDouble())) &&
552 (value.HasMember("longitude") && value["longitude"].IsDouble())) {
553 // wxLogMessage(_T(" ***** Position Update"));
554 m_lat = value["latitude"].AsDouble();
555 m_lon = value["longitude"].AsDouble();
556 m_bGPSValid_SK = true;
557 } else {
558 m_bGPSValid_SK = false;
559 }
560}
561
562
563void CommDriverSignalKNet::updateNavigationSpeedOverGround(
564 wxJSONValue &value, const wxString &sfixtime){
565 double sog_ms = value.AsDouble();
566 double sog_knot = sog_ms * ms_to_knot_factor;
567 // wxLogMessage(wxString::Format(_T(" ***** SOG: %f, %f"), sog_ms, sog_knot));
568 m_sog = sog_knot;
569}
570
571void CommDriverSignalKNet::updateNavigationCourseOverGround(
572 wxJSONValue &value, const wxString &sfixtime) {
573 double cog_rad = value.AsDouble();
574 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
575 // wxLogMessage(wxString::Format(_T(" ***** COG: %f, %f"), cog_rad, cog_deg));
576 m_cog = cog_deg;
577}
578
579void CommDriverSignalKNet::updateGnssSatellites(wxJSONValue &value,
580 const wxString &sfixtime) {
581#if 0
582 if (value.IsInt()) {
583 if (value.AsInt() > 0) {
584 m_frame->setSatelitesInView(value.AsInt());
585 g_priSats = 2;
586 }
587 } else if ((value.HasMember("count") && value["count"].IsInt())) {
588 m_frame->setSatelitesInView(value["count"].AsInt());
589 g_priSats = 3;
590 }
591#endif
592}
593
594void CommDriverSignalKNet::updateHeadingTrue(wxJSONValue &value,
595 const wxString &sfixtime) {
596 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
597}
598
599void CommDriverSignalKNet::updateHeadingMagnetic(
600 wxJSONValue &value, const wxString &sfixtime) {
601 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
602}
603
604void CommDriverSignalKNet::updateMagneticVariance(
605 wxJSONValue &value, const wxString &sfixtime) {
606 m_var = GEODESIC_RAD2DEG(value.AsDouble());
607}
608
609#endif
611
612// std::vector<unsigned char>* payload = p.get();
613//
614// // Extract the NMEA0183 sentence
615// std::string full_sentence = std::string(payload->begin(), payload->end());
const std::string iface
Physical device for 0183, else a unique string.
Definition comm_driver.h:88
Interface implemented by transport layer and possible other parties like test code which should handl...
Definition comm_driver.h:48
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Define an action to be performed when a KeyProvider is notified.
Definition observable.h:228
void Init(const KeyProvider &kp, std::function< void(ObservedEvt &ev)> action)
Initiate an object yet not listening.
Definition observable.h:255
Custom event class for OpenCPN's notification system.
The JSON value class implementation.
Definition jsonval.h:84
int Size() const
Return the size of the array or map stored in this value.
Definition jsonval.cpp:1332
bool HasMember(unsigned index) const
Return TRUE if the object contains an element at the specified index.
Definition jsonval.cpp:1298
wxString AsString() const
Return the stored value as a wxWidget's string.
Definition jsonval.cpp:872
The JSON document writer.
Definition jsonwriter.h:50
void Write(const wxJSONValue &value, wxString &str)
Write the JSONvalue object to a JSON text.
Driver registration container, a singleton.
Raw messages layer, supports sending and recieving navmsg messages.
Driver statistics report.
unsigned rx_count
Number of bytes received since program start.
unsigned error_count
Number of detected errors since program start.
Suspend/resume and new devices events exchange point.